Documentation
¶
Index ¶
- Constants
- func ClassifyNetworkError(err error) string
- func ClassifyReplayError(err error) string
- func Compare(a, b VClock) int
- func IsNetwork(reason string) bool
- func IsSchemaWidening(reason string) bool
- func PeerCortexIDKey(peerName string) string
- func PeerCursorKey(peerName string) string
- func PeerHealthKey(peerName string) string
- func PeerSeenKey(peerName string) string
- type Config
- type EventReplayer
- type PeerConfig
- type PeerError
- type PeerHealth
- type PeerState
- type PollError
- type State
- func (s *State) Delete(key string) error
- func (s *State) Get(key string) (string, error)
- func (s *State) GetClock() (VClock, error)
- func (s *State) GetPeerHealth(name string) (PeerHealth, error)
- func (s *State) GetPeerState(name, endpoint string) (PeerState, error)
- func (s *State) Set(key, value string) error
- func (s *State) SetClock(vc VClock) error
- func (s *State) SetPeerCortexID(name, cortexID string) error
- func (s *State) SetPeerCursor(name, eventID string) error
- func (s *State) SetPeerHealth(name string, h PeerHealth) error
- func (s *State) SetPeerSeen(name, timestamp string) error
- type Syncer
- type VClock
Constants ¶
const ( PeerModeSync = "sync" PeerModePaused = "paused" )
Peer mode constants — checked by the syncer to skip paused peers.
const ( // Schema-widening reasons: these three signal that the peer is // running a binary that predates the event shapes being produced // elsewhere on the ring. `noema federation status` highlights // these specifically with an "upgrade this peer" hint. ReasonInvalidTraceID = "invalid_trace_id" ReasonInvalidFrontmatter = "invalid_frontmatter" ReasonUnknownAction = "unknown_action" ReasonUnknownType = "unknown_type" // Network reasons mirror the syncer's existing categorizeError // tags (refused / timeout / dns / tls / reset / eof), prefixed // so the CLI can group them as "network problems" without // confusing them with replay failures. ReasonNetworkRefused = "network_refused" ReasonNetworkTimeout = "network_timeout" ReasonNetworkDNS = "network_dns" ReasonNetworkTLS = "network_tls" ReasonNetworkReset = "network_reset" ReasonNetworkEOF = "network_eof" // Auth / identity issues. ReasonAuth = "auth" ReasonIdentityMismatch = "identity_mismatch" ReasonIdentityMissing = "identity_missing" // Fallback bucket for anything we can't classify yet. The CLI // shows "other" verbatim and points the operator at the peer's // logs for detail. ReasonOther = "other" )
Reason enum. Values are stable strings so old health records stay parseable when new values are added.
const DefaultInterval = 30 * time.Second
const MaxVClockEntries = 256
MaxVClockEntries caps the number of distinct cortex IDs in a merged vector clock. A legitimate federation ring rarely exceeds a handful of peers; a clock with hundreds of entries is a strong signal of a clock inflation attack (a malicious peer injecting synthetic cortex IDs into its event payloads). The cap prevents unbounded growth of the federation_state row that is serialized on every mutation.
Variables ¶
This section is empty.
Functions ¶
func ClassifyNetworkError ¶ added in v0.9.1
ClassifyNetworkError maps connect/fetch-phase errors into the network_* reason set. Returns ReasonOther for anything that doesn't match a known pattern. Kept side-by-side with the syncer's categorizeError (which returns the short tag) to avoid a pointless second categorization.
func ClassifyReplayError ¶ added in v0.9.1
ClassifyReplayError maps an error returned from Cortex.ReplayEvent into the appropriate reason. The sentinel trace errors drive the three schema-widening reasons; anything else falls to ReasonOther. Kept in a single function so additions to the trace package's error vocabulary have exactly one place to update here too.
func Compare ¶
Compare returns the causal relationship between two clocks:
-1 means a happened-before b 0 means concurrent (no causal relationship — potential conflict) +1 means b happened-before a
func IsNetwork ¶ added in v0.9.1
IsNetwork reports whether a reason belongs to the network-problem family. Kept as a family so the CLI can render a shorter grouped hint rather than a bespoke line per tag.
func IsSchemaWidening ¶ added in v0.9.1
IsSchemaWidening reports whether a reason indicates that the peer binary predates schema changes that newer peers are producing. The CLI uses this to inject a specific "upgrade this peer" suggestion.
func PeerCortexIDKey ¶
PeerCortexIDKey returns the federation_state key under which a peer's verified cortex ULID is pinned after the first successful identity handshake. The syncer refuses to talk to a peer whose advertised ID has changed from what is stored here — see docs/design/cortex-uuid-plan.md.
func PeerCursorKey ¶
PeerCursorKey returns the federation_state key for a peer's last_event cursor.
func PeerHealthKey ¶ added in v0.9.1
PeerHealthKey returns the federation_state key holding the JSON- encoded PeerHealth for this peer. Deliberately separate from the other peer:* keys so health snapshots can be reset without touching the cursor or pinned identity.
func PeerSeenKey ¶
PeerSeenKey returns the federation_state key for a peer's last_seen time.
Types ¶
type Config ¶
type Config struct {
Mode string // federation-level mode: sync | publish | subscribe
Peers []PeerConfig `yaml:"peers,omitempty"`
Interval time.Duration // parsed from string
// outbound request as "Authorization: Bearer <SharedKey>". Empty
// means open mode (no header sent). Populated at startup from
// cortex.LoadAccessKey; never surfaced in YAML, logs, or events.
SharedKey string `yaml:"-"`
}
Config holds federation settings from cortex.md.
func (Config) EffectiveInterval ¶
EffectiveInterval returns the configured interval or the default.
type EventReplayer ¶
EventReplayer materializes a remote event on the local cortex.
type PeerConfig ¶
type PeerConfig struct {
Name string `yaml:"name"`
Endpoint string `yaml:"endpoint"`
CA string `yaml:"ca,omitempty"` // path to CA certificate for TLS verification
Mode string `yaml:"mode,omitempty"` // sync | paused
}
PeerConfig is a peer entry as declared in cortex.md.
type PeerError ¶ added in v0.9.1
type PeerError struct {
// Reason is one of the Reason* constants below.
Reason string `json:"reason"`
// EventID is the ULID of the event that failed to replay, if
// applicable. Empty for failures that occurred before any event
// was fetched.
EventID string `json:"event_id,omitempty"`
// TraceID is the trace ID carried by the failing event, if any.
// Derived from titles via slugification but already exposed
// elsewhere in federation_state (peer cursors index the event
// log), so storing it here adds no new attack surface.
TraceID string `json:"trace_id,omitempty"`
// ObservedAt is the RFC3339 timestamp when the failure happened.
ObservedAt string `json:"observed_at"`
}
PeerError is a structured record of one poll failure. Unlike a raw Go error it carries no free-form error text — Reason is a fixed enum and the other fields are limited to stable references (event/trace IDs that already appear in federation_state anyway).
type PeerHealth ¶ added in v0.9.1
type PeerHealth struct {
// Version is the peer's binary version advertised via MCP
// initialize (serverInfo.Version). Updated on every successful
// connection, left blank until first contact.
Version string `json:"version,omitempty"`
// VersionObservedAt is the RFC3339 timestamp of the most recent
// successful MCP initialize where the peer reported Version.
VersionObservedAt string `json:"version_observed_at,omitempty"`
// LastSuccess is the RFC3339 timestamp of the most recent poll
// iteration that completed without any error (connect, identity
// check, fetch, and replay all succeeded).
LastSuccess string `json:"last_success,omitempty"`
// ConsecutiveFailures counts poll iterations since the last
// success that produced any error. Zero when the last poll
// succeeded.
ConsecutiveFailures int `json:"consecutive_failures,omitempty"`
// LastError is present when the most recent poll failed. Absent
// when LastSuccess is newer than the last failure.
LastError *PeerError `json:"last_error,omitempty"`
}
PeerHealth is the runtime diagnostic state recorded for each peer. It is persisted per-peer as JSON in the federation_state KV store and read by `noema federation status` to surface version skew and stalled-sync conditions.
Intentionally free of free-form error strings: error classification collapses any replay/sync failure into a small fixed enum (PeerError.Reason) plus a few structured fields (event_id, trace_id). The rendered output on the CLI turns the enum back into human-readable text at display time. See the design discussion in this commit's message for the sensitive-data reasoning.
type PeerState ¶
type PeerState struct {
Name string
Endpoint string
LastSeen string // RFC3339, "" if never reached
LastEvent string // ULID of last synced event, "" if never synced
CortexID string // pinned ULID after first successful identity handshake
Health PeerHealth
}
PeerState is the runtime state of a known peer.
type PollError ¶ added in v0.9.1
PollError wraps an error with classification metadata. The syncer returns it from per-peer poll iterations so the outer loop can record the structured outcome without re-parsing the error text.
type State ¶
type State struct {
// contains filtered or unexported fields
}
State provides read/write access to the federation_state table.
func (*State) Delete ¶
Delete removes a single key from federation_state. Missing keys are not an error — Delete is used by `noema federation reset-peer` to clear pin / cursor / last_seen rows that may or may not exist depending on whether the peer has ever been contacted, so the operation has to be idempotent.
func (*State) GetPeerHealth ¶ added in v0.9.1
func (s *State) GetPeerHealth(name string) (PeerHealth, error)
GetPeerHealth loads the structured health snapshot for a peer. A missing key or empty value both return an empty PeerHealth — callers treat that as "no data yet" identically to "first poll is about to happen". Malformed JSON is tolerated the same way: the snapshot is advisory, not load-bearing, and a parse error shouldn't block the CLI from rendering the rest of federation status.
func (*State) GetPeerState ¶
GetPeerState loads the runtime state for a peer.
func (*State) SetPeerCortexID ¶
SetPeerCortexID pins a peer's verified cortex ULID. Should only be called after the cortex_identity handshake has succeeded once.
func (*State) SetPeerCursor ¶
SetPeerCursor updates the last synced event cursor for a peer.
func (*State) SetPeerHealth ¶ added in v0.9.1
func (s *State) SetPeerHealth(name string, h PeerHealth) error
SetPeerHealth persists the snapshot for a peer. Callers build the full PeerHealth (usually by reading the previous value, adjusting, and writing back) so partial updates don't need their own API.
func (*State) SetPeerSeen ¶
SetPeerSeen updates the last seen time for a peer.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer polls remote peers for new events and replays them locally.
type VClock ¶
VClock is a vector clock: one counter per known peer.
func MergeCapped ¶ added in v0.8.1
MergeCapped is like Merge but returns an error when the merged clock would exceed MaxVClockEntries. Use this on the federation ingest path to prevent a malicious peer from inflating the local clock with synthetic cortex IDs.