consolidation

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package consolidation schedules and executes memory-tier consolidation passes inside `noema serve`. See docs/plans/consolidation-plan.md §4 in the Noema-design repo for the design. Phase 7 ships the infrastructure (agent lifecycle + cadence composition). The pass function itself is injected so later phases can populate it with candidate selection (Phase 8) and LLM distillation (Phase 9) without touching this package.

Index

Constants

View Source
const (
	FailReasonEndpointDown      = "endpoint_down"
	FailReasonLLMError          = "llm_error"
	FailReasonValidationFailed  = "validation_failed"
	FailReasonPeerOutranked     = "peer_outranked"
	FailReasonNoWinnerAtRecheck = "no_winner_at_recheck"
	FailReasonContextCanceled   = "context_canceled"
	FailReasonWatchdogExpired   = "watchdog_expired"
)

Fail-reason enum, matching the plan's normalized reason strings. Kept as string constants so callers can compose new reasons without a package change.

The three "preempted" reasons (PeerOutranked, NoWinnerAtRecheck, ContextCanceled) replace the older catch-all "aborted_by_peer_conflict" reason that was emitted by all three quiet-period exit paths. Telling them apart in the event log lets operators distinguish "ai-3 outranked us during the wait" from "everyone's rank entry expired" from "agent.Stop() interrupted us" — same outcome (no pass), wildly different operational meaning.

View Source
const (
	RankIneligible = 0
	RankMin        = 1
	RankMax        = 99
)

Consolidation-eligibility rank constants. Zero means this peer is not participating in the current window; a non-zero value is the peer's bid for leadership. The 1..99 range is deliberately small enough to read at a glance in status output — collisions are expected (birthday-paradox ~10% at 5 peers, ~35% at 10) and resolved by the cortex-ID tiebreak in ElectWinner. See consolidation-plan.md §14.

Variables

This section is empty.

Functions

func ElectWinner

func ElectWinner(entries []federation.RankEntry, minAge time.Duration, now time.Time) string

ElectWinner returns the CortexID of the elected peer, or the empty string if no entry qualifies. Filters applied in order:

  1. Rank <= RankIneligible (peer is not participating this window).
  2. ObservedAt is empty or unparseable (defensive: don't elect a peer we don't have a timestamp for).
  3. ObservedAt is fresher than minAge (quiet-period guard; see §14 of the plan).

Survivors are sorted by rank descending, then CortexID descending. Lex-max CortexID breaks rank ties deterministically across peers.

func GenerateRank

func GenerateRank() (int, error)

GenerateRank returns a fresh random rank in [RankMin, RankMax]. Uses crypto/rand so two peers with a shared math/rand seed (unlikely in production but plausible in fuzz or replay testing) don't generate identical bids and defeat the tiebreak.

func ProbeEndpoint

func ProbeEndpoint(ctx context.Context, endpoint string) bool

ProbeEndpoint returns true when the given OpenAI-compatible endpoint is reachable and responding on /models. It issues an unauthenticated GET {endpoint}/models and treats any 2xx as alive.

Auth headers are deliberately not attached. The check is a liveness signal for the coordination layer ("is this peer's LLM reachable at all"), not a full auth handshake. The actual consolidation pass performs its own authenticated request when the time comes; an endpoint that's up but rejecting our credentials will surface that at pass time, not here.

A nil context is treated as context.Background(). The returned bool captures "probe succeeded"; an underlying network error is not propagated because callers only need a yes/no signal to gate rank eligibility.

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

Agent runs cadence evaluation and dispatches passes to the PassFn. One Agent per cortex. Safe for concurrent Start/Stop, not safe to share across cortexes.

func New

func New(cx Cortex, cfg Config, pass PassFn, log func(format string, args ...any)) *Agent

New constructs an agent. Call Start to begin the loop. Passing a nil PassFn is allowed (the agent logs and returns without doing work); early phases use that to exercise cadence code in isolation.

func (*Agent) Start

func (a *Agent) Start()

Start kicks off the background loop. Call exactly once per Agent; subsequent calls after Stop require a fresh New.

func (*Agent) Stop

func (a *Agent) Stop()

Stop signals the loop to exit and blocks until it does. A pass in flight is allowed to finish; new passes will not start. Safe to call even if Start never ran (no-op).

type ClaimData

type ClaimData struct {
	WindowID string `json:"window_id"`
	CortexID string `json:"cortex_id"`
}

ClaimData is the payload for ActionConsolidationClaim.

type ClusterInput

type ClusterInput struct {
	Traces []TraceInput
}

ClusterInput is what a profile receives for each candidate group: the short-term traces the pipeline is asking the model to possibly distill into a single mid-term memory. The title+body pair is enough context for cohesion and distillation decisions; tags ride along so the distillation inherits a reasonable starting set.

type ClusterResult

type ClusterResult struct {
	IDs        []string      `json:"ids"`
	Bucket     string        `json:"bucket"` // groupKey() output, e.g. "note|2026-04-13"
	Profile    string        `json:"profile"`
	Outcome    string        `json:"outcome"` // distilled | rejected | skipped | fallback | error
	Title      string        `json:"title,omitempty"`
	Tags       []string      `json:"tags,omitempty"`
	Body       string        `json:"body,omitempty"`
	Confidence float64       `json:"confidence,omitempty"`
	Reason     string        `json:"reason,omitempty"`
	Sources    []SourceTrace `json:"sources,omitempty"` // only populated on --emit-json
}

ClusterResult is the per-cluster record for one pass. Marshals to JSON for --emit-json consumption.

type CompletionRequest

type CompletionRequest struct {
	Model           string
	Messages        []Message
	Temperature     float64
	MaxTokens       int
	DisableThinking bool
}

CompletionRequest is the provider-neutral shape the pipeline constructs. The HTTP client serializes it to the OpenAI chat-completions body; model-tier profiles vary the Temperature and MaxTokens but the envelope stays the same.

DisableThinking asks reasoning-capable local models (Qwen3, etc.) to skip their <think> block and answer directly. On llama.cpp's OpenAI-compatible server this sets chat_template_kwargs.enable_thinking=false; providers that don't recognize the field silently ignore it. Cohesion / template / confidence steps use this because thinking consumes the MaxTokens budget before any answer reaches the wire.

type Config

type Config struct {
	// Cron is the nightly trigger time in "HH:MM" (local clock). Empty
	// disables the cron trigger.
	Cron string
	// IdleMinutes fires a pass when no mutation has hit the event log
	// for this many minutes. Zero disables.
	IdleMinutes int
	// ThresholdShort fires a pass when the short-term tier count
	// exceeds this. Zero disables.
	ThresholdShort int
	// PollInterval controls how often the cadence state is re-evaluated.
	// Zero defaults to 60s — tight enough that cron fires within a
	// minute of the configured time, loose enough that the agent is
	// effectively free of background cost.
	PollInterval time.Duration
	// CronRetryWindow is how long to wait after a cron trigger fires
	// before checking whether it actually resulted in a consolidation
	// pass running (locally or on a peer). If no success event has
	// landed within this window, the trigger re-fires up to
	// CronMaxRetries times. Zero disables retries entirely — useful for
	// single-node cortexes whose passes don't emit success events, and
	// for tests that drive evaluateAndMaybeRun directly. Issue #56:
	// without this, a staggered-restart election can have every peer
	// defer to another, nobody runs, and the cron opportunity is burned
	// until tomorrow.
	CronRetryWindow time.Duration
	// CronMaxRetries bounds how many times a single cron fire will
	// re-attempt before the agent gives up and marks the day as failed.
	// Zero treated as "no retries", same as CronRetryWindow=0. Defaults
	// applied in cmd_serve, not here, so tests can probe the zero case.
	CronMaxRetries int
}

Config controls the agent's scheduling. All three triggers are composable; the agent fires on whichever activates first.

type Cortex

type Cortex interface {
	LastMutationTime() (time.Time, error)
	ShortTierCount() (int, error)
	// HasConsolidationSuccessAfter is queried by the cron retry path
	// to detect whether a fired trigger actually resulted in a pass
	// running (locally or on any peer that replayed a success event
	// back to us via federation). See checkCronRetry for the protocol.
	HasConsolidationSuccessAfter(cutoff time.Time) (bool, error)
}

Cortex is the subset of the cortex.Cortex surface the agent needs. Keeping it narrow lets tests fake idle time and tier counts without constructing a real cortex.

type Distillation

type Distillation struct {
	Cohesive   bool
	Title      string
	Body       string
	Tags       []string
	Confidence float64
}

Distillation is the result of a successful pass. Confidence is in [0,1]; when the profile doesn't produce a confidence signal (small profile's template step) the value is zero and the caller treats it as "unknown, not zero-confidence".

type Election

type Election struct {
	// contains filtered or unexported fields
}

Election evaluates rank advertisements into a skip/run decision and emits the coordination events that drive federation-wide consensus. One Election per Agent; safe for concurrent use because all state lives in federation_state rather than in the struct.

func NewElection

func NewElection(cfg ElectionConfig) *Election

NewElection constructs an evaluator with defaults filled in.

func (*Election) Claim

func (e *Election) Claim(windowID string) error

Claim records this peer's intent to run the pass for windowID. Must be emitted before pass execution starts so observers can watchdog off the event timestamp in future phases.

func (*Election) CortexID

func (e *Election) CortexID() string

CortexID returns the configured local cortex ID. Exposed so callers holding only an *Election can label log lines without plumbing the ID separately.

func (*Election) Decide

func (e *Election) Decide() Outcome

Decide evaluates the current rank view and returns whether this peer should proceed with a consolidation pass. No side effects — Claim / Success / Fail are emitted by the caller after observing the outcome, so tests can exercise the decision logic in isolation.

func (*Election) Fail

func (e *Election) Fail(windowID, reason string) error

Fail records an aborted or errored pass. Emitted by the winner when the inner pass returns an error, by the gate when a preemption is detected during the quiet-period recheck, and by future watchdog code when a winner fails to report within its expected duration.

func (*Election) QuietPeriod

func (e *Election) QuietPeriod() time.Duration

QuietPeriod returns the configured quiet-period duration. Exposed for the pass-gate wrapper to sleep between Claim and the recheck.

func (*Election) Success

func (e *Election) Success(windowID string, distillations, sourcesPromoted int) error

Success records a completed pass. Emitted only by the winner after the inner pass function returns without error.

type ElectionConfig

type ElectionConfig struct {
	// CortexID is this cortex's stable ULID. Decide compares it against
	// the winning CortexID to classify self-won vs. other-won vs. none.
	CortexID string

	// PeerNames lists federated peers whose ranks should be gathered
	// from federation_state when Decide is called. Pass an empty slice
	// for single-node cortexes; Decide degenerates to "we're the only
	// eligible peer" automatically.
	PeerNames []string

	// QuietPeriod is the minimum age (now - ObservedAt) for a rank
	// entry to count toward the election. Matches 2 × federation.
	// interval per the plan. Callers in tests may pass zero to disable.
	QuietPeriod time.Duration

	// Now is injected for tests; zero defaults to time.Now.
	Now func() time.Time

	// State provides read access to rank entries. Required.
	State *federation.State

	// Emitter is the target for Claim / Success / Fail events.
	// Required.
	Emitter EventEmitter

	// Log is the optional logger; nil is a safe no-op.
	Log func(format string, args ...any)
}

ElectionConfig is the runtime input for an Election evaluator.

type EligibilityConfig

type EligibilityConfig struct {
	// Enabled reflects cortex.md consolidation.enabled. When false the
	// loop advertises Rank=0 unconditionally so peers see this cortex as
	// opted out.
	Enabled bool

	// LLMEnabled reflects cortex.md consolidation.llm_enabled. When false
	// the loop also advertises Rank=0 — a cortex without an LLM cannot
	// run distillation passes and so cannot legitimately win a round.
	LLMEnabled bool

	// TriggersConfigured is true when the cortex has at least one
	// scheduling trigger (cron / idle_minutes / threshold_short) set.
	// startConsolidator returns nil without starting the agent when no
	// trigger is configured; if this loop kept advertising a non-zero
	// rank in that state, the peer would become a "phantom winner" that
	// other peers defer to forever — the ring would stall because the
	// elected leader never claims a window. Forcing Rank=0 here is the
	// signal to the ring that this peer cannot legitimately run a pass.
	TriggersConfigured bool

	// FederationMode is the effective federation mode of this cortex
	// (sync / publish / subscribe). When "subscribe", the loop forces
	// Rank=0 because a read-only mirror cannot legitimately run a
	// consolidation pass — matches the §14 mode table in the plan.
	// Empty string is treated as "sync" (the EffectiveMode default).
	FederationMode string

	// Endpoint is the OpenAI-compatible base URL probed on every tick.
	// Empty endpoint is treated as unreachable.
	Endpoint string

	// CortexID is this cortex's stable ULID, stamped into every
	// RankEntry so remote peers can attribute the advertisement.
	CortexID string

	// CheckInterval is the probe cadence. Zero defaults to
	// defaultCheckInterval (15 min).
	CheckInterval time.Duration

	// Now injects a clock for tests. Zero defaults to time.Now.
	Now func() time.Time

	// Probe injects a probe function for tests. Zero defaults to
	// ProbeEndpoint with a real HTTP client.
	Probe func(ctx context.Context, endpoint string) bool

	// State persists the advertised RankEntry in federation_state.
	// Required; constructors that receive a nil State will panic at
	// first write rather than silently no-op.
	State *federation.State

	// Log is the optional logger; nil is a safe no-op.
	Log func(format string, args ...any)
}

EligibilityConfig carries the runtime inputs for an EligibilityLoop. Callers build one in cmd_serve from the cortex manifest and the federation state handle.

type EligibilityLoop

type EligibilityLoop struct {
	// contains filtered or unexported fields
}

EligibilityLoop refreshes this peer's consolidation rank on a cadence. One loop per cortex, lifecycle parallel to Agent. Writes the advertised RankEntry into federation_state every cycle.

The loop re-rolls the Rank value on every refresh while the peer is eligible — matches consolidation-plan.md §14 step 5 (eligibility check unconditionally sets rank to a fresh random 1..99). Stability within an election is provided by the quiet-period filter in ElectWinner, not by preserving ranks across ticks; re-rolling every cycle gives leadership rotation across windows for free without a separate "reset on Success event" handler.

func NewEligibilityLoop

func NewEligibilityLoop(cfg EligibilityConfig) *EligibilityLoop

NewEligibilityLoop constructs a loop with defaults filled in. Call Start to begin refreshing; call Stop to drain.

func (*EligibilityLoop) Refresh

func (e *EligibilityLoop) Refresh()

Refresh runs one cycle synchronously. Exported for tests and for callers that want to force a rank update after a config change without waiting for the next tick.

func (*EligibilityLoop) Start

func (e *EligibilityLoop) Start()

Start kicks off the background loop. Call exactly once per loop; subsequent Start calls after Stop require a fresh NewEligibilityLoop.

func (*EligibilityLoop) Stop

func (e *EligibilityLoop) Stop()

Stop signals the loop to exit and blocks until it does. Safe to call even if Start never ran.

type EventEmitter

type EventEmitter interface {
	EmitCoordinationEvent(action event.Action, windowID string, data any) error
}

EventEmitter is the narrow subset of cortex.Cortex that Election needs for emitting coordination events. Defining it here lets tests mock the emission surface without standing up a real Cortex.

type FailData

type FailData struct {
	WindowID string `json:"window_id"`
	CortexID string `json:"cortex_id"`
	Reason   string `json:"reason"`
}

FailData is the payload for ActionConsolidationFail.

type GraduationConfig

type GraduationConfig struct {
	// MinAge is the minimum age a mid-tier trace must reach before it
	// can be considered for graduation. Zero defaults to 14 days.
	MinAge time.Duration

	// MinReadCount is the minimum read_count for graduation. Zero
	// defaults to 3.
	MinReadCount int

	// AllowModified opts OUT of the stability gate. Zero value is
	// false, which matches the plan's "base truths are unmodified
	// since creation" default — so a caller who forgets to set this
	// field still gets the strict behavior. Flip to true in cortexes
	// where edits are routine and shouldn't block graduation.
	//
	// Inverted from the originally-proposed RequireUnmodified to make
	// the zero-value safe: the runtime type is consumed by an
	// internal caller (cmd_serve.go) that translates from the YAML-
	// facing cortex.GraduationConfig, but shipping a struct where
	// the safe default requires an explicit assignment is a
	// maintenance trap.
	AllowModified bool
}

GraduationConfig controls the mid→long promotion heuristic. Every criterion below is an AND-gate: a trace graduates only when it clears every threshold simultaneously. Defaults derive from the consolidation-plan §15 design: 14 days + 3 reads + unmodified + no active downvotes.

type GraduationProvider

type GraduationProvider interface {
	GraduationCandidates(minAge time.Duration) ([]cortex.PromotionCandidate, error)
	Promote(id, newTier string) error
}

GraduationProvider is the narrow subset of Cortex the graduation pass needs. Separate from HeuristicProvider because the candidate query shape is different (mid-tier + age-gated, rather than short-tier + rolling-window).

type HTTPLLMClient

type HTTPLLMClient struct {
	Endpoint   string       // base URL, e.g. "http://localhost:11434/v1"
	APIKey     string       // optional bearer token
	HTTPClient *http.Client // nil means a reasonable default is used
}

HTTPLLMClient posts chat-completion requests to an OpenAI- compatible HTTP endpoint. Default timeout is 5 minutes which is generous for small local models and the 70B-class frontier on consumer hardware; tighten via Client.Timeout if needed.

func NewHTTPLLMClient

func NewHTTPLLMClient(endpoint, apiKeyEnv string) (*HTTPLLMClient, error)

NewHTTPLLMClient constructs a client from a cortex.md-style endpoint string and an optional env-var name for the API key. The env-var indirection matches the access.shared_key_file pattern for the MCP server: the secret itself never lives in cortex.md.

func (*HTTPLLMClient) Complete

func (c *HTTPLLMClient) Complete(ctx context.Context, req CompletionRequest) (string, error)

Complete posts one chat-completion request and returns the first choice's content. Non-streaming by design — the pipeline needs the full response to validate and parse before deciding what to do.

type HeuristicProvider

type HeuristicProvider interface {
	PromotionCandidates(tier string, window time.Duration) ([]cortex.PromotionCandidate, error)
	Promote(id, newTier string) error
}

HeuristicProvider is the subset of Cortex the heuristic pass needs. Separate from the narrower scheduler interface in agent.go so the agent can still run with a no-op pass for tests that exercise cadence without needing promotion plumbing.

type LLMClient

type LLMClient interface {
	Complete(ctx context.Context, req CompletionRequest) (string, error)
}

LLMClient is the interface the consolidation pipeline uses to talk to a language model. Kept narrow (one method) so tests can stub it without replaying the OpenAI-compatible HTTP dance for every case.

type LLMCortex

type LLMCortex interface {
	PromotionCandidates(tier string, window time.Duration) ([]cortex.PromotionCandidate, error)
	Promote(id, newTier string) error
	CreateDistilledTrace(spec cortex.DistilledTraceSpec) (string, error)
	TraceFile(id string, archived bool) string
	Get(id string) (*cortex.Row, error)
}

LLMCortex is the Cortex surface the LLM-driven consolidation pipeline needs. Separate from the narrower scheduler interface in agent.go so tests of cadence don't have to stub promotion paths.

type Message

type Message struct {
	Role    string `json:"role"`
	Content string `json:"content"`
}

Message mirrors the OpenAI chat-completions message shape. All providers this client targets (Ollama, LMStudio, llama.cpp server, vLLM, OpenAI, Azure OpenAI) accept the same three roles.

type Outcome

type Outcome struct {
	// ShouldRun is true when the caller should proceed with the pass.
	ShouldRun bool

	// Winner is the CortexID of the elected peer, or empty if no one
	// qualified.
	Winner string

	// Reason is a short human-readable explanation for logs.
	Reason string
}

Outcome is the result of one Decide call.

type PassConfig

type PassConfig struct {
	// Window bounds the candidate pool to traces created within the
	// last N. Zero defaults to 24h.
	Window time.Duration

	// PromotionThreshold is the minimum blended score a candidate
	// needs to promote from short to mid. Zero defaults to 5 (tuned
	// so: 5 agent reads qualifies, 1 explicit user vote qualifies,
	// 2 derived-from references qualifies).
	PromotionThreshold int

	// Per-signal weights. Zero uses the defaults:
	//   reads    = 1  (weakest; easy to inflate)
	//   modifies = 2  (stronger; agent actively edited)
	//   lineage  = 3  (stronger still; others reference this)
	//   votes    = 5  (strongest; explicit user/agent intent)
	WeightReads    int
	WeightModifies int
	WeightLineage  int
	WeightVotes    int
}

PassConfig controls the heuristic pass's scoring function and window. Zero values use the defaults documented below.

type PassFn

type PassFn func(ctx context.Context, trigger string) error

PassFn is the consolidation work itself. trigger identifies which cadence caused this pass (one of "cron", "idle", "threshold") so later-phase implementations can log / telemeter differently. The function must honor ctx for shutdown.

func ChainPasses

func ChainPasses(first, second PassFn) PassFn

ChainPasses composes two PassFns into one that runs them in order. Used by cmd_serve to wire heuristic short→mid and graduate mid→long onto a single scheduler trigger. If the first pass errors, the second still runs — graduation is independent of short-tier promotion and shouldn't be blocked by its failure.

func DistillationPass

func DistillationPass(cx *cortex.Cortex, cfg PipelineConfig, endpoint, apiKeyEnv string, log func(format string, args ...any)) PassFn

DistillationPass returns a PassFn that runs the full LLM distillation pipeline (the same code path as `noema consolidate`) on every scheduled trigger, suitable for composing into the agent's chained pass via ChainPasses(distill, heuristic) → graduation.

Failure semantics: the returned PassFn never propagates an error from the LLM pipeline. Endpoint unreachable, client-build failure, or pass error are logged and swallowed so the caller can still chain cheap heuristic + graduation work behind it. Context cancellation does propagate so shutdown still aborts the pass.

Used from cmd_serve.go when ConsolidationConfig.AutoDistillationEnabled is true. The stable CLI-triggered `noema consolidate` path continues to build its own client and call RunLLMPass directly because it wants endpoint errors surfaced to the operator.

func GraduatePass

func GraduatePass(cx GraduationProvider, cfg GraduationConfig, log func(format string, args ...any)) PassFn

GraduatePass returns a PassFn that evaluates every mid-tier trace older than cfg.MinAge against the AND-gate criteria and promotes qualifying ones to long. Runs alongside HeuristicPass on the same trigger cadence — each scheduler fire invokes both passes in sequence, short→mid first then mid→long.

Why a separate pass rather than extending HeuristicPass: the candidate set is disjoint (short vs mid), the thresholds are independent, and the graduation rule is intentionally strict and AND-gated rather than blended. Keeping the two concerns in separate functions means each can evolve without touching the other's tests.

func HeuristicPass

func HeuristicPass(cx HeuristicProvider, cfg PassConfig, log func(format string, args ...any)) PassFn

HeuristicPass returns a PassFn that scores every in-window short-term candidate and 1:1-promotes those whose score meets the threshold. Idempotent across runs because promoted traces leave the short-term pool (tier column moves them to mid) and are not surfaced by subsequent PromotionCandidates calls.

This is the Phase 8 implementation — LLM-free, no clustering, no many-to-one distillation. Phase 9 will add a second pass that distills clusters into new mid-term traces atop this baseline.

func WithElection

func WithElection(inner PassFn, election *Election, log func(format string, args ...any)) PassFn

WithElection wraps a PassFn so only the elected peer runs it. The wrapper implements the happy-path protocol from consolidation-plan.md §14; watchdog recovery for a winner that fails to report is deferred to a later phase.

Flow:

  1. Decide winner. If we didn't win, log + return nil (observers silently skip).
  2. Emit ActionConsolidationClaim with a fresh window ID.
  3. Sleep one QuietPeriod so any parallel-starting claimant has time to surface in federation_state.
  4. Re-decide. If we no longer win, emit Fail with one of the three preemption reasons and return: - FailReasonPeerOutranked: a peer with higher rank (or the tiebreak) won the recheck. - FailReasonNoWinnerAtRecheck: no peer qualifies anymore (every entry got filtered as too-fresh or expired during the wait). - FailReasonContextCanceled: ctx.Done() fired during the sleep.
  5. Invoke inner(ctx, trigger). On error, emit Fail(reason=error message); on success, emit Success.

Context cancellation during the quiet-period sleep is treated as a preemption: the gate emits Fail and returns ctx.Err so the agent's Stop() drains promptly.

On a single-node cortex with no peers advertising rank, Decide returns ShouldRun=true for the local cortex trivially and the wrapper degenerates to "emit Claim, brief sleep, emit Success around the pass" — correct but mildly wasteful. Phase 4 can add a no-peers fast path if the overhead ever matters.

type PassResult

type PassResult struct {
	Trigger    string
	Considered int
	Promoted   int
	Skipped    int
}

PassResult summarises a single heuristic pass for logging / future telemetry. Exported fields because later phases may want to surface this in `noema memory stats` or event payloads.

type PipelineConfig

type PipelineConfig struct {
	Window     time.Duration
	ModelTier  string
	ModelName  string
	MaxRetries int
	DryRun     bool
}

PipelineConfig carries everything an LLM pass needs that isn't the Cortex or the LLM itself. Sourced from cortex.md's ConsolidationConfig plus CLI overrides.

type PipelineResult

type PipelineResult struct {
	CandidatesConsidered int
	ClustersAttempted    int
	DistillationsCreated int
	FallbackPromotions   int
	Rejected             int
	Skipped              int

	// ClusterResults is the per-cluster breakdown. Populated for every
	// cluster the pipeline considered — distilled, rejected, skipped,
	// or fallback-promoted. Used by --emit-json for prompt-tuning
	// evaluation (a frontier model can score each distillation against
	// its sources to turn vibes into numbers).
	ClusterResults []ClusterResult `json:"cluster_results,omitempty"`
}

PipelineResult summarises a single pass so the CLI can log what happened without re-deriving it from events.

func RunLLMPass

func RunLLMPass(ctx context.Context, cx LLMCortex, llm LLMClient, cfg PipelineConfig, log func(format string, args ...any)) (PipelineResult, error)

RunLLMPass reads candidates from the cortex, groups them into clusters bounded by the model-tier profile's max cluster size, runs each cluster through the profile (cohesion + template / single-shot JSON), validates the output, and either records the distilled trace or falls back to heuristic 1:1 promotion.

Context cancellation halts the pass at the next cluster boundary. Individual cluster failures are logged and skipped — one bad cluster should not abort the whole run.

type Profile

type Profile interface {
	Name() string
	MaxClusterSize() int
	Run(ctx context.Context, llm LLMClient, model string, cluster ClusterInput) (Distillation, error)
}

Profile is the strategy interface — one implementation per model-tier bucket. Each profile owns its prompt shape and its response parsing so the pipeline code stays model-agnostic.

func GetProfile

func GetProfile(tier string) Profile

GetProfile returns the profile implementation for a tier name. An unknown tier falls back to "large" — the conservative default — and logs a note so misconfigurations are visible but not fatal.

type SourceTrace

type SourceTrace struct {
	ID    string   `json:"id"`
	Title string   `json:"title"`
	Tags  []string `json:"tags,omitempty"`
	Body  string   `json:"body,omitempty"`
}

SourceTrace mirrors TraceInput but is the JSON-serialization shape. Kept separate so the internal pipeline type can evolve without changing the emit-json schema.

type SuccessData

type SuccessData struct {
	WindowID             string `json:"window_id"`
	CortexID             string `json:"cortex_id"`
	DistillationsCreated int    `json:"distillations_created,omitempty"`
	SourcesPromoted      int    `json:"sources_promoted,omitempty"`
}

SuccessData is the payload for ActionConsolidationSuccess. Distillation and promotion counts are optional in v1; Phase 4 wires them in when the pass function's return signature is enriched.

type TraceInput

type TraceInput struct {
	ID    string
	Title string
	Body  string
	Tags  []string
}

type Watchdog

type Watchdog struct {
	// contains filtered or unexported fields
}

Watchdog scans the local event log for consolidation_claim events with no matching success/fail and emits a closing consolidation_fail event (reason=watchdog_expired) so the next election cycle can move on. One per cortex; lifecycle parallel to Agent + EligibilityLoop.

The watchdog runs on every peer, not just election winners. The whole point is that the winner is presumed dead — observers need to notice and break out. Cross-peer duplicate emissions are possible (two observers detect the same stale claim within seconds of each other); events have unique IDs so replay is idempotent and the only cost is a little log noise. Cross-peer locking would be substantially more complex and isn't worth it for what is already a corner case.

func NewWatchdog

func NewWatchdog(cfg WatchdogConfig) *Watchdog

NewWatchdog constructs a Watchdog with defaults filled in. Call Start to begin the sweep loop; call Stop to drain.

func (*Watchdog) Start

func (w *Watchdog) Start()

Start kicks off the background sweep loop. Call exactly once per instance; subsequent Start calls after Stop require a fresh NewWatchdog.

func (*Watchdog) Stop

func (w *Watchdog) Stop()

Stop signals the loop to exit and blocks until it does. Safe to call even if Start never ran.

func (*Watchdog) Sweep

func (w *Watchdog) Sweep() error

Sweep runs one scan synchronously. Exported for tests and for callers that want to force a check without waiting for the next tick.

Not safe for concurrent invocation — the loop goroutine is the only expected caller in production. The internal mutex serializes any stray test calls against the loop.

type WatchdogConfig

type WatchdogConfig struct {
	// DB is the cortex's SQLite connection. Required; the sweep query
	// joins the events table to itself to find orphaned claims.
	DB *sql.DB

	// Emitter is the target for the closing fail events. In production
	// this is the same *cortex.Cortex that EligibilityLoop and Election
	// share; the narrow interface lets tests substitute a recorder.
	Emitter EventEmitter

	// LocalCortexID is this cortex's stable ULID. Stamped into every
	// emitted fail event so observers can attribute the closing
	// decision to a specific peer for audit purposes.
	LocalCortexID string

	// Timeout is how old a claim must be (now - claim.timestamp) before
	// the watchdog treats it as orphaned. Generous by default — the
	// longest legitimate pass we expect is LLM distillation on a
	// large window, which still completes well inside this budget.
	// Zero defaults to defaultWatchdogTimeout.
	Timeout time.Duration

	// Interval is the sweep cadence. Sweeps are cheap (one indexed
	// query) so a tight cadence is fine; the trade-off is mostly how
	// long an actually-stuck pass blocks the ring before being
	// closed out. Zero defaults to defaultWatchdogInterval.
	Interval time.Duration

	// Now is injected for tests; zero defaults to time.Now.
	Now func() time.Time

	// Log is the optional logger; nil is a safe no-op.
	Log func(format string, args ...any)
}

WatchdogConfig is the runtime input for a Watchdog instance. Built in cmd_serve from the Cortex handle and the federation config.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL