federation

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PeerModeSync   = "sync"
	PeerModePaused = "paused"
)

Peer mode constants — checked by the syncer to skip paused peers.

View Source
const (
	// Schema-widening reasons: these three signal that the peer is
	// running a binary that predates the event shapes being produced
	// elsewhere on the ring. `noema federation status` highlights
	// these specifically with an "upgrade this peer" hint.
	ReasonInvalidTraceID     = "invalid_trace_id"
	ReasonInvalidFrontmatter = "invalid_frontmatter"
	ReasonUnknownAction      = "unknown_action"
	ReasonUnknownType        = "unknown_type"

	// Network reasons mirror the syncer's existing categorizeError
	// tags (refused / timeout / dns / tls / reset / eof), prefixed
	// so the CLI can group them as "network problems" without
	// confusing them with replay failures.
	ReasonNetworkRefused = "network_refused"
	ReasonNetworkTimeout = "network_timeout"
	ReasonNetworkDNS     = "network_dns"
	ReasonNetworkTLS     = "network_tls"
	ReasonNetworkReset   = "network_reset"
	ReasonNetworkEOF     = "network_eof"

	// Auth / identity issues.
	ReasonAuth             = "auth"
	ReasonIdentityMismatch = "identity_mismatch"
	ReasonIdentityMissing  = "identity_missing"

	// Fallback bucket for anything we can't classify yet. The CLI
	// shows "other" verbatim and points the operator at the peer's
	// logs for detail.
	ReasonOther = "other"
)

Reason enum. Values are stable strings so old health records stay parseable when new values are added.

View Source
const DefaultInterval = 30 * time.Second
View Source
const MaxVClockEntries = 256

MaxVClockEntries caps the number of distinct cortex IDs in a merged vector clock. A legitimate federation ring rarely exceeds a handful of peers; a clock with hundreds of entries is a strong signal of a clock inflation attack (a malicious peer injecting synthetic cortex IDs into its event payloads). The cap prevents unbounded growth of the federation_state row that is serialized on every mutation.

Variables

This section is empty.

Functions

func ClassifyNetworkError added in v0.9.1

func ClassifyNetworkError(err error) string

ClassifyNetworkError maps connect/fetch-phase errors into the network_* reason set. Returns ReasonOther for anything that doesn't match a known pattern. Kept side-by-side with the syncer's categorizeError (which returns the short tag) to avoid a pointless second categorization.

func ClassifyReplayError added in v0.9.1

func ClassifyReplayError(err error) string

ClassifyReplayError maps an error returned from Cortex.ReplayEvent into the appropriate reason. The sentinel trace errors drive the three schema-widening reasons; anything else falls to ReasonOther. Kept in a single function so additions to the trace package's error vocabulary have exactly one place to update here too.

func Compare

func Compare(a, b VClock) int

Compare returns the causal relationship between two clocks:

-1 means a happened-before b
 0 means concurrent (no causal relationship — potential conflict)
+1 means b happened-before a

func IsNetwork added in v0.9.1

func IsNetwork(reason string) bool

IsNetwork reports whether a reason belongs to the network-problem family. Kept as a family so the CLI can render a shorter grouped hint rather than a bespoke line per tag.

func IsSchemaWidening added in v0.9.1

func IsSchemaWidening(reason string) bool

IsSchemaWidening reports whether a reason indicates that the peer binary predates schema changes that newer peers are producing. The CLI uses this to inject a specific "upgrade this peer" suggestion.

func PeerCortexIDKey

func PeerCortexIDKey(peerName string) string

PeerCortexIDKey returns the federation_state key under which a peer's verified cortex ULID is pinned after the first successful identity handshake. The syncer refuses to talk to a peer whose advertised ID has changed from what is stored here — see docs/design/cortex-uuid-plan.md.

func PeerCursorKey

func PeerCursorKey(peerName string) string

PeerCursorKey returns the federation_state key for a peer's last_event cursor.

func PeerHealthKey added in v0.9.1

func PeerHealthKey(peerName string) string

PeerHealthKey returns the federation_state key holding the JSON- encoded PeerHealth for this peer. Deliberately separate from the other peer:* keys so health snapshots can be reset without touching the cursor or pinned identity.

func PeerSeenKey

func PeerSeenKey(peerName string) string

PeerSeenKey returns the federation_state key for a peer's last_seen time.

Types

type Config

type Config struct {
	Mode     string        // federation-level mode: sync | publish | subscribe
	Peers    []PeerConfig  `yaml:"peers,omitempty"`
	Interval time.Duration // parsed from string

	// SharedKey is the MCP bearer token this syncer attaches to every
	// outbound request as "Authorization: Bearer <SharedKey>". Empty
	// means open mode (no header sent). Populated at startup from
	// cortex.LoadAccessKey; never surfaced in YAML, logs, or events.
	SharedKey string `yaml:"-"`
}

Config holds federation settings from cortex.md.

func (Config) EffectiveInterval

func (c Config) EffectiveInterval() time.Duration

EffectiveInterval returns the configured interval or the default.

type EventReplayer

type EventReplayer interface {
	ReplayEvent(event.Event) error
	MergeClock(VClock) error
}

EventReplayer materializes a remote event on the local cortex.

type PeerConfig

type PeerConfig struct {
	Name     string `yaml:"name"`
	Endpoint string `yaml:"endpoint"`
	CA       string `yaml:"ca,omitempty"`   // path to CA certificate for TLS verification
	Mode     string `yaml:"mode,omitempty"` // sync | paused
}

PeerConfig is a peer entry as declared in cortex.md.

type PeerError added in v0.9.1

type PeerError struct {
	// Reason is one of the Reason* constants below.
	Reason string `json:"reason"`
	// EventID is the ULID of the event that failed to replay, if
	// applicable. Empty for failures that occurred before any event
	// was fetched.
	EventID string `json:"event_id,omitempty"`
	// TraceID is the trace ID carried by the failing event, if any.
	// Derived from titles via slugification but already exposed
	// elsewhere in federation_state (peer cursors index the event
	// log), so storing it here adds no new attack surface.
	TraceID string `json:"trace_id,omitempty"`
	// ObservedAt is the RFC3339 timestamp when the failure happened.
	ObservedAt string `json:"observed_at"`
}

PeerError is a structured record of one poll failure. Unlike a raw Go error it carries no free-form error text — Reason is a fixed enum and the other fields are limited to stable references (event/trace IDs that already appear in federation_state anyway).

type PeerHealth added in v0.9.1

type PeerHealth struct {
	// Version is the peer's binary version advertised via MCP
	// initialize (serverInfo.Version). Updated on every successful
	// connection, left blank until first contact.
	Version string `json:"version,omitempty"`

	// VersionObservedAt is the RFC3339 timestamp of the most recent
	// successful MCP initialize where the peer reported Version.
	VersionObservedAt string `json:"version_observed_at,omitempty"`

	// LastSuccess is the RFC3339 timestamp of the most recent poll
	// iteration that completed without any error (connect, identity
	// check, fetch, and replay all succeeded).
	LastSuccess string `json:"last_success,omitempty"`

	// ConsecutiveFailures counts poll iterations since the last
	// success that produced any error. Zero when the last poll
	// succeeded.
	ConsecutiveFailures int `json:"consecutive_failures,omitempty"`

	// LastError is present when the most recent poll failed. Absent
	// when LastSuccess is newer than the last failure.
	LastError *PeerError `json:"last_error,omitempty"`
}

PeerHealth is the runtime diagnostic state recorded for each peer. It is persisted per-peer as JSON in the federation_state KV store and read by `noema federation status` to surface version skew and stalled-sync conditions.

Intentionally free of free-form error strings: error classification collapses any replay/sync failure into a small fixed enum (PeerError.Reason) plus a few structured fields (event_id, trace_id). The rendered output on the CLI turns the enum back into human-readable text at display time. See the design discussion in this commit's message for the sensitive-data reasoning.

type PeerState

type PeerState struct {
	Name      string
	Endpoint  string
	LastSeen  string // RFC3339, "" if never reached
	LastEvent string // ULID of last synced event, "" if never synced
	CortexID  string // pinned ULID after first successful identity handshake
	Health    PeerHealth
}

PeerState is the runtime state of a known peer.

type PollError added in v0.9.1

type PollError struct {
	Reason  string
	EventID string
	TraceID string
	Err     error
}

PollError wraps an error with classification metadata. The syncer returns it from per-peer poll iterations so the outer loop can record the structured outcome without re-parsing the error text.

func (*PollError) Error added in v0.9.1

func (e *PollError) Error() string

func (*PollError) Unwrap added in v0.9.1

func (e *PollError) Unwrap() error

type State

type State struct {
	// contains filtered or unexported fields
}

State provides read/write access to the federation_state table.

func NewState

func NewState(db *sql.DB) *State

func (*State) Delete

func (s *State) Delete(key string) error

Delete removes a single key from federation_state. Missing keys are not an error — Delete is used by `noema federation reset-peer` to clear pin / cursor / last_seen rows that may or may not exist depending on whether the peer has ever been contacted, so the operation has to be idempotent.

func (*State) Get

func (s *State) Get(key string) (string, error)

func (*State) GetClock

func (s *State) GetClock() (VClock, error)

GetClock loads the vector clock from federation_state.

func (*State) GetPeerHealth added in v0.9.1

func (s *State) GetPeerHealth(name string) (PeerHealth, error)

GetPeerHealth loads the structured health snapshot for a peer. A missing key or empty value both return an empty PeerHealth — callers treat that as "no data yet" identically to "first poll is about to happen". Malformed JSON is tolerated the same way: the snapshot is advisory, not load-bearing, and a parse error shouldn't block the CLI from rendering the rest of federation status.

func (*State) GetPeerState

func (s *State) GetPeerState(name, endpoint string) (PeerState, error)

GetPeerState loads the runtime state for a peer.

func (*State) Set

func (s *State) Set(key, value string) error

func (*State) SetClock

func (s *State) SetClock(vc VClock) error

SetClock persists the vector clock to federation_state.

func (*State) SetPeerCortexID

func (s *State) SetPeerCortexID(name, cortexID string) error

SetPeerCortexID pins a peer's verified cortex ULID. Should only be called after the cortex_identity handshake has succeeded once.

func (*State) SetPeerCursor

func (s *State) SetPeerCursor(name, eventID string) error

SetPeerCursor updates the last synced event cursor for a peer.

func (*State) SetPeerHealth added in v0.9.1

func (s *State) SetPeerHealth(name string, h PeerHealth) error

SetPeerHealth persists the snapshot for a peer. Callers build the full PeerHealth (usually by reading the previous value, adjusting, and writing back) so partial updates don't need their own API.

func (*State) SetPeerSeen

func (s *State) SetPeerSeen(name, timestamp string) error

SetPeerSeen updates the last seen time for a peer.

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

Syncer polls remote peers for new events and replays them locally.

func NewSyncer

func NewSyncer(replayer EventReplayer, state *State, cfg Config) *Syncer

func (*Syncer) Start

func (s *Syncer) Start()

func (*Syncer) Stop

func (s *Syncer) Stop()

type VClock

type VClock map[string]uint64

VClock is a vector clock: one counter per known peer.

func Merge

func Merge(a, b VClock) VClock

Merge returns a new VClock with the component-wise max of two clocks.

func MergeCapped added in v0.8.1

func MergeCapped(a, b VClock) (VClock, error)

MergeCapped is like Merge but returns an error when the merged clock would exceed MaxVClockEntries. Use this on the federation ingest path to prevent a malicious peer from inflating the local clock with synthetic cortex IDs.

func (VClock) Clone

func (vc VClock) Clone() VClock

Clone returns a deep copy of the vector clock.

func (VClock) Increment

func (vc VClock) Increment(peer string)

Increment bumps the counter for the given peer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL