Documentation
¶
Overview ¶
Package consolidation schedules and executes memory-tier consolidation passes inside `noema serve`. See docs/plans/consolidation-plan.md §4 in the Noema-design repo for the design. Phase 7 ships the infrastructure (agent lifecycle + cadence composition). The pass function itself is injected so later phases can populate it with candidate selection (Phase 8) and LLM distillation (Phase 9) without touching this package.
Index ¶
- Constants
- func ElectWinner(entries []federation.RankEntry, minAge time.Duration, now time.Time) string
- func GenerateRank() (int, error)
- func ProbeEndpoint(ctx context.Context, endpoint string) bool
- type Agent
- type ClaimData
- type ClusterInput
- type ClusterResult
- type CompletionRequest
- type Config
- type Cortex
- type Distillation
- type Election
- func (e *Election) Claim(windowID string) error
- func (e *Election) CortexID() string
- func (e *Election) Decide() Outcome
- func (e *Election) Fail(windowID, reason string) error
- func (e *Election) QuietPeriod() time.Duration
- func (e *Election) Success(windowID string, distillations, sourcesPromoted int) error
- type ElectionConfig
- type EligibilityConfig
- type EligibilityLoop
- type EventEmitter
- type FailData
- type GraduationConfig
- type GraduationProvider
- type HTTPLLMClient
- type HeuristicProvider
- type LLMClient
- type LLMCortex
- type Message
- type Outcome
- type PassConfig
- type PassFn
- func ChainPasses(first, second PassFn) PassFn
- func DistillationPass(cx *cortex.Cortex, cfg PipelineConfig, endpoint, apiKeyEnv string, ...) PassFn
- func GraduatePass(cx GraduationProvider, cfg GraduationConfig, ...) PassFn
- func HeuristicPass(cx HeuristicProvider, cfg PassConfig, log func(format string, args ...any)) PassFn
- func WithElection(inner PassFn, election *Election, log func(format string, args ...any)) PassFn
- type PassResult
- type PipelineConfig
- type PipelineResult
- type Profile
- type SourceTrace
- type SuccessData
- type TraceInput
- type Watchdog
- type WatchdogConfig
Constants ¶
const ( FailReasonEndpointDown = "endpoint_down" FailReasonLLMError = "llm_error" FailReasonValidationFailed = "validation_failed" FailReasonPeerOutranked = "peer_outranked" FailReasonNoWinnerAtRecheck = "no_winner_at_recheck" FailReasonContextCanceled = "context_canceled" FailReasonWatchdogExpired = "watchdog_expired" )
Fail-reason enum, matching the plan's normalized reason strings. Kept as string constants so callers can compose new reasons without a package change.
The three "preempted" reasons (PeerOutranked, NoWinnerAtRecheck, ContextCanceled) replace the older catch-all "aborted_by_peer_conflict" reason that was emitted by all three quiet-period exit paths. Telling them apart in the event log lets operators distinguish "ai-3 outranked us during the wait" from "everyone's rank entry expired" from "agent.Stop() interrupted us" — same outcome (no pass), wildly different operational meaning.
const ( RankIneligible = 0 RankMin = 1 RankMax = 99 )
Consolidation-eligibility rank constants. Zero means this peer is not participating in the current window; a non-zero value is the peer's bid for leadership. The 1..99 range is deliberately small enough to read at a glance in status output — collisions are expected (birthday-paradox ~10% at 5 peers, ~35% at 10) and resolved by the cortex-ID tiebreak in ElectWinner. See consolidation-plan.md §14.
Variables ¶
This section is empty.
Functions ¶
func ElectWinner ¶
ElectWinner returns the CortexID of the elected peer, or the empty string if no entry qualifies. Filters applied in order:
- Rank <= RankIneligible (peer is not participating this window).
- ObservedAt is empty or unparseable (defensive: don't elect a peer we don't have a timestamp for).
- ObservedAt is fresher than minAge (quiet-period guard; see §14 of the plan).
Survivors are sorted by rank descending, then CortexID descending. Lex-max CortexID breaks rank ties deterministically across peers.
func GenerateRank ¶
GenerateRank returns a fresh random rank in [RankMin, RankMax]. Uses crypto/rand so two peers with a shared math/rand seed (unlikely in production but plausible in fuzz or replay testing) don't generate identical bids and defeat the tiebreak.
func ProbeEndpoint ¶
ProbeEndpoint returns true when the given OpenAI-compatible endpoint is reachable and responding on /models. It issues an unauthenticated GET {endpoint}/models and treats any 2xx as alive.
Auth headers are deliberately not attached. The check is a liveness signal for the coordination layer ("is this peer's LLM reachable at all"), not a full auth handshake. The actual consolidation pass performs its own authenticated request when the time comes; an endpoint that's up but rejecting our credentials will surface that at pass time, not here.
A nil context is treated as context.Background(). The returned bool captures "probe succeeded"; an underlying network error is not propagated because callers only need a yes/no signal to gate rank eligibility.
Types ¶
type Agent ¶
type Agent struct {
// contains filtered or unexported fields
}
Agent runs cadence evaluation and dispatches passes to the PassFn. One Agent per cortex. Safe for concurrent Start/Stop, not safe to share across cortexes.
func New ¶
New constructs an agent. Call Start to begin the loop. Passing a nil PassFn is allowed (the agent logs and returns without doing work); early phases use that to exercise cadence code in isolation.
type ClusterInput ¶
type ClusterInput struct {
Traces []TraceInput
}
ClusterInput is what a profile receives for each candidate group: the short-term traces the pipeline is asking the model to possibly distill into a single mid-term memory. The title+body pair is enough context for cohesion and distillation decisions; tags ride along so the distillation inherits a reasonable starting set.
type ClusterResult ¶
type ClusterResult struct {
IDs []string `json:"ids"`
Bucket string `json:"bucket"` // groupKey() output, e.g. "note|2026-04-13"
Profile string `json:"profile"`
Outcome string `json:"outcome"` // distilled | rejected | skipped | fallback | error
Title string `json:"title,omitempty"`
Tags []string `json:"tags,omitempty"`
Body string `json:"body,omitempty"`
Confidence float64 `json:"confidence,omitempty"`
Reason string `json:"reason,omitempty"`
Sources []SourceTrace `json:"sources,omitempty"` // only populated on --emit-json
}
ClusterResult is the per-cluster record for one pass. Marshals to JSON for --emit-json consumption.
type CompletionRequest ¶
type CompletionRequest struct {
Model string
Messages []Message
Temperature float64
MaxTokens int
DisableThinking bool
}
CompletionRequest is the provider-neutral shape the pipeline constructs. The HTTP client serializes it to the OpenAI chat-completions body; model-tier profiles vary the Temperature and MaxTokens but the envelope stays the same.
DisableThinking asks reasoning-capable local models (Qwen3, etc.) to skip their <think> block and answer directly. On llama.cpp's OpenAI-compatible server this sets chat_template_kwargs.enable_thinking=false; providers that don't recognize the field silently ignore it. Cohesion / template / confidence steps use this because thinking consumes the MaxTokens budget before any answer reaches the wire.
type Config ¶
type Config struct {
// Cron is the nightly trigger time in "HH:MM" (local clock). Empty
// disables the cron trigger.
Cron string
// IdleMinutes fires a pass when no mutation has hit the event log
// for this many minutes. Zero disables.
IdleMinutes int
// ThresholdShort fires a pass when the short-term tier count
// exceeds this. Zero disables.
ThresholdShort int
// PollInterval controls how often the cadence state is re-evaluated.
// Zero defaults to 60s — tight enough that cron fires within a
// minute of the configured time, loose enough that the agent is
// effectively free of background cost.
PollInterval time.Duration
// CronRetryWindow is how long to wait after a cron trigger fires
// before checking whether it actually resulted in a consolidation
// pass running (locally or on a peer). If no success event has
// landed within this window, the trigger re-fires up to
// CronMaxRetries times. Zero disables retries entirely — useful for
// single-node cortexes whose passes don't emit success events, and
// for tests that drive evaluateAndMaybeRun directly. Issue #56:
// without this, a staggered-restart election can have every peer
// defer to another, nobody runs, and the cron opportunity is burned
// until tomorrow.
CronRetryWindow time.Duration
// CronMaxRetries bounds how many times a single cron fire will
// re-attempt before the agent gives up and marks the day as failed.
// Zero treated as "no retries", same as CronRetryWindow=0. Defaults
// applied in cmd_serve, not here, so tests can probe the zero case.
CronMaxRetries int
}
Config controls the agent's scheduling. All three triggers are composable; the agent fires on whichever activates first.
type Cortex ¶
type Cortex interface {
LastMutationTime() (time.Time, error)
ShortTierCount() (int, error)
// HasConsolidationSuccessAfter is queried by the cron retry path
// to detect whether a fired trigger actually resulted in a pass
// running (locally or on any peer that replayed a success event
// back to us via federation). See checkCronRetry for the protocol.
HasConsolidationSuccessAfter(cutoff time.Time) (bool, error)
}
Cortex is the subset of the cortex.Cortex surface the agent needs. Keeping it narrow lets tests fake idle time and tier counts without constructing a real cortex.
type Distillation ¶
type Distillation struct {
Cohesive bool
Title string
Body string
Tags []string
Confidence float64
}
Distillation is the result of a successful pass. Confidence is in [0,1]; when the profile doesn't produce a confidence signal (small profile's template step) the value is zero and the caller treats it as "unknown, not zero-confidence".
type Election ¶
type Election struct {
// contains filtered or unexported fields
}
Election evaluates rank advertisements into a skip/run decision and emits the coordination events that drive federation-wide consensus. One Election per Agent; safe for concurrent use because all state lives in federation_state rather than in the struct.
func NewElection ¶
func NewElection(cfg ElectionConfig) *Election
NewElection constructs an evaluator with defaults filled in.
func (*Election) Claim ¶
Claim records this peer's intent to run the pass for windowID. Must be emitted before pass execution starts so observers can watchdog off the event timestamp in future phases.
func (*Election) CortexID ¶
CortexID returns the configured local cortex ID. Exposed so callers holding only an *Election can label log lines without plumbing the ID separately.
func (*Election) Decide ¶
Decide evaluates the current rank view and returns whether this peer should proceed with a consolidation pass. No side effects — Claim / Success / Fail are emitted by the caller after observing the outcome, so tests can exercise the decision logic in isolation.
func (*Election) Fail ¶
Fail records an aborted or errored pass. Emitted by the winner when the inner pass returns an error, by the gate when a preemption is detected during the quiet-period recheck, and by future watchdog code when a winner fails to report within its expected duration.
func (*Election) QuietPeriod ¶
QuietPeriod returns the configured quiet-period duration. Exposed for the pass-gate wrapper to sleep between Claim and the recheck.
type ElectionConfig ¶
type ElectionConfig struct {
// CortexID is this cortex's stable ULID. Decide compares it against
// the winning CortexID to classify self-won vs. other-won vs. none.
CortexID string
// PeerNames lists federated peers whose ranks should be gathered
// from federation_state when Decide is called. Pass an empty slice
// for single-node cortexes; Decide degenerates to "we're the only
// eligible peer" automatically.
PeerNames []string
// QuietPeriod is the minimum age (now - ObservedAt) for a rank
// entry to count toward the election. Matches 2 × federation.
// interval per the plan. Callers in tests may pass zero to disable.
QuietPeriod time.Duration
// Now is injected for tests; zero defaults to time.Now.
Now func() time.Time
// State provides read access to rank entries. Required.
State *federation.State
// Emitter is the target for Claim / Success / Fail events.
// Required.
Emitter EventEmitter
// Log is the optional logger; nil is a safe no-op.
Log func(format string, args ...any)
}
ElectionConfig is the runtime input for an Election evaluator.
type EligibilityConfig ¶
type EligibilityConfig struct {
// Enabled reflects cortex.md consolidation.enabled. When false the
// loop advertises Rank=0 unconditionally so peers see this cortex as
// opted out.
Enabled bool
// LLMEnabled reflects cortex.md consolidation.llm_enabled. When false
// the loop also advertises Rank=0 — a cortex without an LLM cannot
// run distillation passes and so cannot legitimately win a round.
LLMEnabled bool
// TriggersConfigured is true when the cortex has at least one
// scheduling trigger (cron / idle_minutes / threshold_short) set.
// startConsolidator returns nil without starting the agent when no
// trigger is configured; if this loop kept advertising a non-zero
// rank in that state, the peer would become a "phantom winner" that
// other peers defer to forever — the ring would stall because the
// elected leader never claims a window. Forcing Rank=0 here is the
// signal to the ring that this peer cannot legitimately run a pass.
TriggersConfigured bool
// FederationMode is the effective federation mode of this cortex
// (sync / publish / subscribe). When "subscribe", the loop forces
// Rank=0 because a read-only mirror cannot legitimately run a
// consolidation pass — matches the §14 mode table in the plan.
// Empty string is treated as "sync" (the EffectiveMode default).
FederationMode string
// Endpoint is the OpenAI-compatible base URL probed on every tick.
// Empty endpoint is treated as unreachable.
Endpoint string
// CortexID is this cortex's stable ULID, stamped into every
// RankEntry so remote peers can attribute the advertisement.
CortexID string
// CheckInterval is the probe cadence. Zero defaults to
// defaultCheckInterval (15 min).
CheckInterval time.Duration
// Now injects a clock for tests. Zero defaults to time.Now.
Now func() time.Time
// Probe injects a probe function for tests. Zero defaults to
// ProbeEndpoint with a real HTTP client.
Probe func(ctx context.Context, endpoint string) bool
// State persists the advertised RankEntry in federation_state.
// Required; constructors that receive a nil State will panic at
// first write rather than silently no-op.
State *federation.State
// Log is the optional logger; nil is a safe no-op.
Log func(format string, args ...any)
}
EligibilityConfig carries the runtime inputs for an EligibilityLoop. Callers build one in cmd_serve from the cortex manifest and the federation state handle.
type EligibilityLoop ¶
type EligibilityLoop struct {
// contains filtered or unexported fields
}
EligibilityLoop refreshes this peer's consolidation rank on a cadence. One loop per cortex, lifecycle parallel to Agent. Writes the advertised RankEntry into federation_state every cycle.
The loop re-rolls the Rank value on every refresh while the peer is eligible — matches consolidation-plan.md §14 step 5 (eligibility check unconditionally sets rank to a fresh random 1..99). Stability within an election is provided by the quiet-period filter in ElectWinner, not by preserving ranks across ticks; re-rolling every cycle gives leadership rotation across windows for free without a separate "reset on Success event" handler.
func NewEligibilityLoop ¶
func NewEligibilityLoop(cfg EligibilityConfig) *EligibilityLoop
NewEligibilityLoop constructs a loop with defaults filled in. Call Start to begin refreshing; call Stop to drain.
func (*EligibilityLoop) Refresh ¶
func (e *EligibilityLoop) Refresh()
Refresh runs one cycle synchronously. Exported for tests and for callers that want to force a rank update after a config change without waiting for the next tick.
func (*EligibilityLoop) Start ¶
func (e *EligibilityLoop) Start()
Start kicks off the background loop. Call exactly once per loop; subsequent Start calls after Stop require a fresh NewEligibilityLoop.
func (*EligibilityLoop) Stop ¶
func (e *EligibilityLoop) Stop()
Stop signals the loop to exit and blocks until it does. Safe to call even if Start never ran.
type EventEmitter ¶
type EventEmitter interface {
EmitCoordinationEvent(action event.Action, windowID string, data any) error
}
EventEmitter is the narrow subset of cortex.Cortex that Election needs for emitting coordination events. Defining it here lets tests mock the emission surface without standing up a real Cortex.
type FailData ¶
type FailData struct {
WindowID string `json:"window_id"`
CortexID string `json:"cortex_id"`
Reason string `json:"reason"`
}
FailData is the payload for ActionConsolidationFail.
type GraduationConfig ¶
type GraduationConfig struct {
// MinAge is the minimum age a mid-tier trace must reach before it
// can be considered for graduation. Zero defaults to 14 days.
MinAge time.Duration
// MinReadCount is the minimum read_count for graduation. Zero
// defaults to 3.
MinReadCount int
// AllowModified opts OUT of the stability gate. Zero value is
// false, which matches the plan's "base truths are unmodified
// since creation" default — so a caller who forgets to set this
// field still gets the strict behavior. Flip to true in cortexes
// where edits are routine and shouldn't block graduation.
//
// Inverted from the originally-proposed RequireUnmodified to make
// the zero-value safe: the runtime type is consumed by an
// internal caller (cmd_serve.go) that translates from the YAML-
// facing cortex.GraduationConfig, but shipping a struct where
// the safe default requires an explicit assignment is a
// maintenance trap.
AllowModified bool
}
GraduationConfig controls the mid→long promotion heuristic. Every criterion below is an AND-gate: a trace graduates only when it clears every threshold simultaneously. Defaults derive from the consolidation-plan §15 design: 14 days + 3 reads + unmodified + no active downvotes.
type GraduationProvider ¶
type GraduationProvider interface {
GraduationCandidates(minAge time.Duration) ([]cortex.PromotionCandidate, error)
Promote(id, newTier string) error
}
GraduationProvider is the narrow subset of Cortex the graduation pass needs. Separate from HeuristicProvider because the candidate query shape is different (mid-tier + age-gated, rather than short-tier + rolling-window).
type HTTPLLMClient ¶
type HTTPLLMClient struct {
Endpoint string // base URL, e.g. "http://localhost:11434/v1"
APIKey string // optional bearer token
HTTPClient *http.Client // nil means a reasonable default is used
}
HTTPLLMClient posts chat-completion requests to an OpenAI- compatible HTTP endpoint. Default timeout is 5 minutes which is generous for small local models and the 70B-class frontier on consumer hardware; tighten via Client.Timeout if needed.
func NewHTTPLLMClient ¶
func NewHTTPLLMClient(endpoint, apiKeyEnv string) (*HTTPLLMClient, error)
NewHTTPLLMClient constructs a client from a cortex.md-style endpoint string and an optional env-var name for the API key. The env-var indirection matches the access.shared_key_file pattern for the MCP server: the secret itself never lives in cortex.md.
func (*HTTPLLMClient) Complete ¶
func (c *HTTPLLMClient) Complete(ctx context.Context, req CompletionRequest) (string, error)
Complete posts one chat-completion request and returns the first choice's content. Non-streaming by design — the pipeline needs the full response to validate and parse before deciding what to do.
type HeuristicProvider ¶
type HeuristicProvider interface {
PromotionCandidates(tier string, window time.Duration) ([]cortex.PromotionCandidate, error)
Promote(id, newTier string) error
}
HeuristicProvider is the subset of Cortex the heuristic pass needs. Separate from the narrower scheduler interface in agent.go so the agent can still run with a no-op pass for tests that exercise cadence without needing promotion plumbing.
type LLMClient ¶
type LLMClient interface {
Complete(ctx context.Context, req CompletionRequest) (string, error)
}
LLMClient is the interface the consolidation pipeline uses to talk to a language model. Kept narrow (one method) so tests can stub it without replaying the OpenAI-compatible HTTP dance for every case.
type LLMCortex ¶
type LLMCortex interface {
PromotionCandidates(tier string, window time.Duration) ([]cortex.PromotionCandidate, error)
Promote(id, newTier string) error
CreateDistilledTrace(spec cortex.DistilledTraceSpec) (string, error)
TraceFile(id string, archived bool) string
Get(id string) (*cortex.Row, error)
}
LLMCortex is the Cortex surface the LLM-driven consolidation pipeline needs. Separate from the narrower scheduler interface in agent.go so tests of cadence don't have to stub promotion paths.
type Message ¶
Message mirrors the OpenAI chat-completions message shape. All providers this client targets (Ollama, LMStudio, llama.cpp server, vLLM, OpenAI, Azure OpenAI) accept the same three roles.
type Outcome ¶
type Outcome struct {
// ShouldRun is true when the caller should proceed with the pass.
ShouldRun bool
// Winner is the CortexID of the elected peer, or empty if no one
// qualified.
Winner string
// Reason is a short human-readable explanation for logs.
Reason string
}
Outcome is the result of one Decide call.
type PassConfig ¶
type PassConfig struct {
// Window bounds the candidate pool to traces created within the
// last N. Zero defaults to 24h.
Window time.Duration
// PromotionThreshold is the minimum blended score a candidate
// needs to promote from short to mid. Zero defaults to 5 (tuned
// so: 5 agent reads qualifies, 1 explicit user vote qualifies,
// 2 derived-from references qualifies).
PromotionThreshold int
// Per-signal weights. Zero uses the defaults:
// reads = 1 (weakest; easy to inflate)
// modifies = 2 (stronger; agent actively edited)
// lineage = 3 (stronger still; others reference this)
// votes = 5 (strongest; explicit user/agent intent)
WeightReads int
WeightModifies int
WeightLineage int
WeightVotes int
}
PassConfig controls the heuristic pass's scoring function and window. Zero values use the defaults documented below.
type PassFn ¶
PassFn is the consolidation work itself. trigger identifies which cadence caused this pass (one of "cron", "idle", "threshold") so later-phase implementations can log / telemeter differently. The function must honor ctx for shutdown.
func ChainPasses ¶
ChainPasses composes two PassFns into one that runs them in order. Used by cmd_serve to wire heuristic short→mid and graduate mid→long onto a single scheduler trigger. If the first pass errors, the second still runs — graduation is independent of short-tier promotion and shouldn't be blocked by its failure.
func DistillationPass ¶
func DistillationPass(cx *cortex.Cortex, cfg PipelineConfig, endpoint, apiKeyEnv string, log func(format string, args ...any)) PassFn
DistillationPass returns a PassFn that runs the full LLM distillation pipeline (the same code path as `noema consolidate`) on every scheduled trigger, suitable for composing into the agent's chained pass via ChainPasses(distill, heuristic) → graduation.
Failure semantics: the returned PassFn never propagates an error from the LLM pipeline. Endpoint unreachable, client-build failure, or pass error are logged and swallowed so the caller can still chain cheap heuristic + graduation work behind it. Context cancellation does propagate so shutdown still aborts the pass.
Used from cmd_serve.go when ConsolidationConfig.AutoDistillationEnabled is true. The stable CLI-triggered `noema consolidate` path continues to build its own client and call RunLLMPass directly because it wants endpoint errors surfaced to the operator.
func GraduatePass ¶
func GraduatePass(cx GraduationProvider, cfg GraduationConfig, log func(format string, args ...any)) PassFn
GraduatePass returns a PassFn that evaluates every mid-tier trace older than cfg.MinAge against the AND-gate criteria and promotes qualifying ones to long. Runs alongside HeuristicPass on the same trigger cadence — each scheduler fire invokes both passes in sequence, short→mid first then mid→long.
Why a separate pass rather than extending HeuristicPass: the candidate set is disjoint (short vs mid), the thresholds are independent, and the graduation rule is intentionally strict and AND-gated rather than blended. Keeping the two concerns in separate functions means each can evolve without touching the other's tests.
func HeuristicPass ¶
func HeuristicPass(cx HeuristicProvider, cfg PassConfig, log func(format string, args ...any)) PassFn
HeuristicPass returns a PassFn that scores every in-window short-term candidate and 1:1-promotes those whose score meets the threshold. Idempotent across runs because promoted traces leave the short-term pool (tier column moves them to mid) and are not surfaced by subsequent PromotionCandidates calls.
This is the Phase 8 implementation — LLM-free, no clustering, no many-to-one distillation. Phase 9 will add a second pass that distills clusters into new mid-term traces atop this baseline.
func WithElection ¶
WithElection wraps a PassFn so only the elected peer runs it. The wrapper implements the happy-path protocol from consolidation-plan.md §14; watchdog recovery for a winner that fails to report is deferred to a later phase.
Flow:
- Decide winner. If we didn't win, log + return nil (observers silently skip).
- Emit ActionConsolidationClaim with a fresh window ID.
- Sleep one QuietPeriod so any parallel-starting claimant has time to surface in federation_state.
- Re-decide. If we no longer win, emit Fail with one of the three preemption reasons and return: - FailReasonPeerOutranked: a peer with higher rank (or the tiebreak) won the recheck. - FailReasonNoWinnerAtRecheck: no peer qualifies anymore (every entry got filtered as too-fresh or expired during the wait). - FailReasonContextCanceled: ctx.Done() fired during the sleep.
- Invoke inner(ctx, trigger). On error, emit Fail(reason=error message); on success, emit Success.
Context cancellation during the quiet-period sleep is treated as a preemption: the gate emits Fail and returns ctx.Err so the agent's Stop() drains promptly.
On a single-node cortex with no peers advertising rank, Decide returns ShouldRun=true for the local cortex trivially and the wrapper degenerates to "emit Claim, brief sleep, emit Success around the pass" — correct but mildly wasteful. Phase 4 can add a no-peers fast path if the overhead ever matters.
type PassResult ¶
PassResult summarises a single heuristic pass for logging / future telemetry. Exported fields because later phases may want to surface this in `noema memory stats` or event payloads.
type PipelineConfig ¶
type PipelineConfig struct {
Window time.Duration
ModelTier string
ModelName string
MaxRetries int
DryRun bool
}
PipelineConfig carries everything an LLM pass needs that isn't the Cortex or the LLM itself. Sourced from cortex.md's ConsolidationConfig plus CLI overrides.
type PipelineResult ¶
type PipelineResult struct {
CandidatesConsidered int
ClustersAttempted int
DistillationsCreated int
FallbackPromotions int
Rejected int
Skipped int
// ClusterResults is the per-cluster breakdown. Populated for every
// cluster the pipeline considered — distilled, rejected, skipped,
// or fallback-promoted. Used by --emit-json for prompt-tuning
// evaluation (a frontier model can score each distillation against
// its sources to turn vibes into numbers).
ClusterResults []ClusterResult `json:"cluster_results,omitempty"`
}
PipelineResult summarises a single pass so the CLI can log what happened without re-deriving it from events.
func RunLLMPass ¶
func RunLLMPass(ctx context.Context, cx LLMCortex, llm LLMClient, cfg PipelineConfig, log func(format string, args ...any)) (PipelineResult, error)
RunLLMPass reads candidates from the cortex, groups them into clusters bounded by the model-tier profile's max cluster size, runs each cluster through the profile (cohesion + template / single-shot JSON), validates the output, and either records the distilled trace or falls back to heuristic 1:1 promotion.
Context cancellation halts the pass at the next cluster boundary. Individual cluster failures are logged and skipped — one bad cluster should not abort the whole run.
type Profile ¶
type Profile interface {
Name() string
MaxClusterSize() int
Run(ctx context.Context, llm LLMClient, model string, cluster ClusterInput) (Distillation, error)
}
Profile is the strategy interface — one implementation per model-tier bucket. Each profile owns its prompt shape and its response parsing so the pipeline code stays model-agnostic.
func GetProfile ¶
GetProfile returns the profile implementation for a tier name. An unknown tier falls back to "large" — the conservative default — and logs a note so misconfigurations are visible but not fatal.
type SourceTrace ¶
type SourceTrace struct {
ID string `json:"id"`
Title string `json:"title"`
Tags []string `json:"tags,omitempty"`
Body string `json:"body,omitempty"`
}
SourceTrace mirrors TraceInput but is the JSON-serialization shape. Kept separate so the internal pipeline type can evolve without changing the emit-json schema.
type SuccessData ¶
type SuccessData struct {
WindowID string `json:"window_id"`
CortexID string `json:"cortex_id"`
DistillationsCreated int `json:"distillations_created,omitempty"`
SourcesPromoted int `json:"sources_promoted,omitempty"`
}
SuccessData is the payload for ActionConsolidationSuccess. Distillation and promotion counts are optional in v1; Phase 4 wires them in when the pass function's return signature is enriched.
type Watchdog ¶
type Watchdog struct {
// contains filtered or unexported fields
}
Watchdog scans the local event log for consolidation_claim events with no matching success/fail and emits a closing consolidation_fail event (reason=watchdog_expired) so the next election cycle can move on. One per cortex; lifecycle parallel to Agent + EligibilityLoop.
The watchdog runs on every peer, not just election winners. The whole point is that the winner is presumed dead — observers need to notice and break out. Cross-peer duplicate emissions are possible (two observers detect the same stale claim within seconds of each other); events have unique IDs so replay is idempotent and the only cost is a little log noise. Cross-peer locking would be substantially more complex and isn't worth it for what is already a corner case.
func NewWatchdog ¶
func NewWatchdog(cfg WatchdogConfig) *Watchdog
NewWatchdog constructs a Watchdog with defaults filled in. Call Start to begin the sweep loop; call Stop to drain.
func (*Watchdog) Start ¶
func (w *Watchdog) Start()
Start kicks off the background sweep loop. Call exactly once per instance; subsequent Start calls after Stop require a fresh NewWatchdog.
func (*Watchdog) Stop ¶
func (w *Watchdog) Stop()
Stop signals the loop to exit and blocks until it does. Safe to call even if Start never ran.
func (*Watchdog) Sweep ¶
Sweep runs one scan synchronously. Exported for tests and for callers that want to force a check without waiting for the next tick.
Not safe for concurrent invocation — the loop goroutine is the only expected caller in production. The internal mutex serializes any stray test calls against the loop.
type WatchdogConfig ¶
type WatchdogConfig struct {
// DB is the cortex's SQLite connection. Required; the sweep query
// joins the events table to itself to find orphaned claims.
DB *sql.DB
// Emitter is the target for the closing fail events. In production
// this is the same *cortex.Cortex that EligibilityLoop and Election
// share; the narrow interface lets tests substitute a recorder.
Emitter EventEmitter
// LocalCortexID is this cortex's stable ULID. Stamped into every
// emitted fail event so observers can attribute the closing
// decision to a specific peer for audit purposes.
LocalCortexID string
// Timeout is how old a claim must be (now - claim.timestamp) before
// the watchdog treats it as orphaned. Generous by default — the
// longest legitimate pass we expect is LLM distillation on a
// large window, which still completes well inside this budget.
// Zero defaults to defaultWatchdogTimeout.
Timeout time.Duration
// Interval is the sweep cadence. Sweeps are cheap (one indexed
// query) so a tight cadence is fine; the trade-off is mostly how
// long an actually-stuck pass blocks the ring before being
// closed out. Zero defaults to defaultWatchdogInterval.
Interval time.Duration
// Now is injected for tests; zero defaults to time.Now.
Now func() time.Time
// Log is the optional logger; nil is a safe no-op.
Log func(format string, args ...any)
}
WatchdogConfig is the runtime input for a Watchdog instance. Built in cmd_serve from the Cortex handle and the federation config.