agent

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

core/agent/altitude.go

core/agent/cypherguard.go

core/agent/digesttools.go

core/agent/graphexpand.go

core/agent/hydrate.go

core/agent/index.go

core/agent/recall.go

core/agent/toolkit.go

core/agent/tools.go

core/agent/watch.go

core/agent/write.go

Index

Constants

View Source
const (
	LevelEntity = 0 // L0: one per distillable source row
	LevelScope  = 1 // L1: scope/cluster backbone
	LevelTenant = 2 // L2: single tenant root
)

Digest node levels.

View Source
const (
	KindEntityNode  = "entity"
	KindScopeNode   = "scope"
	KindClusterNode = "cluster"
	KindTenantNode  = "tenant"
)

Digest node kinds.

View Source
const DigestEntity = "digest_node"

DigestEntity is the registry entity name of the distillation tree node.

Variables

View Source
var ErrSummaryBlocked = errors.New("agent: summary blocked by guard")

ErrSummaryBlocked is the internal sentinel for a guard veto. DistillL0 does NOT return it — a blocked summary is fail-closed (changed=false, nothing stored) — but the rollup paths reuse it to distinguish a block from an error.

View Source
var ErrUnindexablePayload = errors.New("agent: unindexable event payload")

ErrUnindexablePayload is returned by IndexEvent when the event payload cannot be unmarshalled into a map. Such events are structurally poison and will never succeed on retry; callers should ack-skip them rather than leaving them in the pending-entry list (PEL).

Functions

func ClusterID

func ClusterID(prefix uint64, p int) string

ClusterID derives the stable id of a cluster (L1) digest node from a SemHash prefix. The prefix is the cluster identity — stable across membership drift.

func ClusterPrefix

func ClusterPrefix(h uint64, p int) uint64

ClusterPrefix keeps the top p bits of h (the bucket key); the rest are zeroed.

func FormatSemHash

func FormatSemHash(h uint64) string

FormatSemHash renders a SemHash as 16 lowercase hex chars (JSON/precision-safe).

func HammingClose

func HammingClose(a, b uint64, maxBits int) bool

HammingClose reports whether a and b differ in at most maxBits bits.

func HammingDistance

func HammingDistance(a, b uint64) int

HammingDistance counts differing bits.

func L0ContentHash

func L0ContentHash(recipeVersion, sourceFieldHash string) string

L0ContentHash is the Merkle hash of a leaf: h(recipeVersion ‖ sourceFieldHash). It is structural (over the source, not the non-deterministic summary text) and gates whether the Summarizer is called at all.

func L0ID

func L0ID(sourceKind, sourceID string) string

L0ID derives the stable id of an entity (L0) digest node.

func NewSemPlanes

func NewSemPlanes(dims int, seed int64) [64][]float32

NewSemPlanes builds 64 fixed random hyperplanes of the given dimensionality, deterministically from seed. Persisting planes is equivalent to fixing the seed, so callers get stable SemHashes across processes by passing a constant.

func NoiseFloorMet

func NoiseFloorMet(members, floor int) bool

NoiseFloorMet reports whether a SemHash bucket has enough members to become a cluster node (singletons get no digest).

func ParseSemHash

func ParseSemHash(s string) (uint64, error)

ParseSemHash parses the 16-hex form produced by FormatSemHash.

func RollupContentHash

func RollupContentHash(recipeVersion string, childHashes []string) string

RollupContentHash is the Merkle hash of an internal node: h(recipeVersion ‖ sorted child ContentHashes). Sorting makes it independent of child order; any child change propagates up.

func ScopeID

func ScopeID(scopeName, scopeID string) string

ScopeID derives the stable id of a scope (L1) digest node.

func SemHash

func SemHash(embedding []float32, planes [64][]float32) uint64

SemHash is a 64-bit SimHash (LSH): bit i = sign(embedding · planeᵢ). Cosine- close embeddings produce Hamming-close hashes. A zero/empty embedding hashes to 0. Planes shorter than the embedding compare over the shared prefix.

func SourceFieldHash

func SourceFieldHash(text string) string

SourceFieldHash hashes an L0 node's source text (the concatenated fields).

func TenantRootID

func TenantRootID() string

TenantRootID is the stable id of the tenant (L2) root digest node.

Types

type Altitude

type Altitude int

Altitude controls which layer of the distillation tree is surfaced during recall. Higher altitudes surface coarser (more-summarised) nodes.

const (
	// AltAuto lets the budget decide: descend to entities when affordable,
	// climb to the tenant digest when the budget is tight.
	AltAuto Altitude = iota
	// AltEntity surfaces raw source entities only (no digest nodes).
	AltEntity
	// AltScope surfaces scope/cluster digest nodes.
	AltScope
	// AltTenant surfaces the single tenant-root digest node.
	AltTenant
)

type BackfillReport

type BackfillReport struct {
	Entities int // number of distillable entities enumerated
	Rows     int // total source rows seen across all entities
	Built    int // L0 nodes that were (re)summarized and persisted (changed=true)
}

BackfillReport summarises one Distill pass over the tenant in ctx.

type ChildDigest

type ChildDigest struct {
	ID      string `json:"id"`
	Kind    string `json:"kind"`
	Summary string `json:"summary"`
}

ChildDigest is a child summary fed into a rollup summarization.

type Config

type Config struct {
	K              int                // candidates per channel (default 24)
	Hops           int                // graph expansion depth (default 1)
	VectorDims     int                // expected embedding dims (default 768)
	ChannelWeights map[string]float64 // per-channel RRF weight (default 1.0 each)
	Tokenizer      func([]byte) int   // token estimator (default bytes/4)
	Strict         bool               // fail on any channel error (default false: lenient)
	GraphSeeds     int                // top seeds (vector+search) to expand in the graph channel (default 8)
	GraphReverse   bool               // expand incoming (reverse) edges too; default false
	Write          WritePolicy        // agent write allowlist; empty = no writes
	Altitude       Altitude           // distillation layer to surface (default AltAuto: budget decides)
	CAS            blob.CAS           // content-addressed store for digest summaries (optional; nil = no CAS-backed ops)
}

Config tunes the toolkit. Zero values get sensible defaults via withDefaults.

type ContextItem

type ContextItem struct {
	Entity string          `json:"entity"`
	ID     string          `json:"id"`
	Row    json.RawMessage `json:"row"`
	Score  float64         `json:"score"`
	Source []string        `json:"source"`
	Tokens int             `json:"tokens"`
}

ContextItem is one hydrated row in a recall result.

type ContextPack

type ContextPack struct {
	Items    []ContextItem `json:"items"`
	Omitted  int           `json:"omitted"`
	Tokens   int           `json:"tokens"`
	Warnings []string      `json:"warnings,omitempty"`
}

ContextPack is the token-budgeted recall result.

type DigestChild

type DigestChild struct {
	ID          string `json:"id"`
	Kind        string `json:"kind"`
	Summary     string `json:"summary"`
	ContentHash string `json:"contentHash"`
	SemHash     string `json:"semHash"`
}

DigestChild is one child entry in a DigestView: the child's id, kind, and Merkle hashes. Summary is populated only when the Toolkit was configured with a CAS (otherwise empty).

type DigestView

type DigestView struct {
	Node     MapLine       `json:"node"`
	Summary  string        `json:"summary"`
	Children []DigestChild `json:"children"`
}

DigestView is the drill-down result returned by Toolkit.Digest: the node's own MapLine, its summary text (from CAS), and its immediate children.

type DistillConfig

type DistillConfig struct {
	RecipeVersion string // salts ContentHash; bump to invalidate the whole tree
	VectorDims    int
	SemSeed       int64
	ClusterBits   int // top-p SemHash bits used as the cluster bucket key
	NoiseFloor    int // min members for a bucket to become a cluster node
	FailOpenGuard bool
	Budget        int // default L0 summary token budget
}

DistillConfig tunes the Distiller. Zero values get defaults via withDefaults.

type DistillObserver

type DistillObserver interface {
	Summarized()     // a Summarize call was made (L0 or internal node)
	ShortCircuited() // a Merkle hash matched — the node was not re-summarized
	NodeBuilt()      // a node summary was persisted (CAS + vector + row)
	GuardBlocked()   // a guard vetoed an ingest or emit (fail-closed drop)
}

DistillObserver receives distillation events for metrics/audit. All methods are optional via a nil observer. Kept in core (not the worker) so the Prometheus dependency stays out of core/agent: the worker passes a small adapter that increments counters.

type Distiller

type Distiller struct {
	// contains filtered or unexported fields
}

Distiller builds and maintains a tenant's context-distillation Merkle tree. L0 leaves are derived from distillable source rows; internal nodes (scope, cluster, tenant) are produced by rollup. A ContentHash Merkle short-circuit keeps re-distillation cheap, and a two-stage Guard (ingest + emit) fences PII out of both the model and the content-addressed store.

func NewDistiller

func NewDistiller(fab query.Fabric, reg *registry.Registry, emb Embedder, sum Summarizer, guard Guard, cas blob.CAS, cfg DistillConfig) (*Distiller, error)

NewDistiller builds a Distiller. emb, sum, cas are required; guard is optional (nil = identity). The embedder's dimensionality must match the configured VectorDims (after defaults).

func (*Distiller) DeleteL0

func (d *Distiller) DeleteL0(ctx context.Context, entity, id string) (bool, error)

DeleteL0 removes an entity's L0 digest node: it unlinks the node from each of its parents' ChildIDs, deletes the digest row through the command plane, and removes its vector. Returns (false, nil) when the node is already gone (idempotent). The next Rollup re-rolls the affected parents and collapses any scope/cluster node that drops below the noise floor (see Rollup's cleanup pass).

func (*Distiller) Distill

func (d *Distiller) Distill(ctx context.Context) (BackfillReport, error)

Distill performs a per-tenant backfill/rebuild of the context-distillation Merkle tree. For every registered entity that has a DistillSpec it pages through all source rows (Reindex-style) and calls DistillL0 per row, then calls Rollup once to rebuild the L1/L2 backbone.

Distill is tenant-scoped: it operates on the tenant present in ctx (RLS). Call once per tenant for a full cluster-wide rebuild.

func (*Distiller) DistillEvent

func (d *Distiller) DistillEvent(ctx context.Context, env event.Envelope) (bool, error)

DistillEvent distills a create/update event whose aggregate is distillable. It mirrors the embed worker's IndexEvent: the L0 source values come from the event payload (no store reload), so callers need no relational seeding.

A ".deleted" event routes to DeleteL0 (returning removed/err). Otherwise it returns (false, nil) — a no-op — for a non-distillable aggregate or an empty payload, or unmarshals the payload into a column-keyed map and runs DistillL0, returning its changed/err.

func (*Distiller) DistillL0

func (d *Distiller) DistillL0(ctx context.Context, entity, id string, vals map[string]any) (bool, error)

DistillL0 distills one source row into its L0 digest node. Returns changed=true when the node was (re)summarized and persisted; false on a Merkle short-circuit (unchanged source), on a guard block (fail-closed: nothing is stored), for non-distillable entities, or for empty source text.

func (*Distiller) DistillSpecFor

func (d *Distiller) DistillSpecFor(entity string) *registry.DistillSpec

DistillSpecFor returns the DistillSpec for an entity, or nil when the entity is unknown or not opted into distillation. Exported for the worker's distillable-aggregate predicate.

func (*Distiller) Rollup

func (d *Distiller) Rollup(ctx context.Context) (RollupReport, error)

Rollup recomputes the tenant's L1 (scope + cluster) backbone and L2 (tenant) root from the current L0 leaves, with a ContentHash Merkle short-circuit at every internal node: a node whose sorted child ContentHashes are unchanged is not re-summarized. Tenant scope comes from ctx (RLS).

Cluster assignment is part of this pass: each L0 is bucketed by its SemHash prefix; buckets meeting the noise floor become cluster nodes and are linked as parents of their members. Below-floor (singleton) buckets get no cluster node.

func (*Distiller) SetObserver

func (d *Distiller) SetObserver(o DistillObserver)

SetObserver attaches an optional DistillObserver for metrics/audit. Pass nil to detach. Not safe to call concurrently with distillation.

type Embedder

type Embedder interface {
	Embed(ctx context.Context, texts []string) ([][]float32, error)
	Dims() int
}

Embedder turns text into vectors. The host supplies the implementation (Anthropic, OpenAI, a local model); fabriq stays model-agnostic. Embed returns one vector per input string, in order. Dims reports the embedding dimensionality, validated against the vector port at NewToolkit.

type Guard

type Guard interface {
	Guard(ctx context.Context, in GuardInput) (GuardResult, error)
}

Guard is the host-supplied, optional PII/guardrail seam (nil = identity).

type GuardInput

type GuardInput struct {
	Stage    GuardStage `json:"stage"`
	TenantID string     `json:"tenantId"`
	Scope    ScopeRef   `json:"scope"`
	Level    int        `json:"level"`
	Text     string     `json:"text"`
}

GuardInput is one guard call.

type GuardResult

type GuardResult struct {
	Text    string `json:"text"`
	Blocked bool   `json:"blocked"`
	Reason  string `json:"reason"`
}

GuardResult is a guard verdict: possibly-redacted text, a block flag, and an audit reason.

type GuardStage

type GuardStage int

GuardStage marks which stage of distillation a guard call protects.

const (
	// GuardIngest redacts raw content BEFORE the Summarizer sees it.
	GuardIngest GuardStage = iota
	// GuardEmit checks a generated summary BEFORE it is hashed + written to CAS.
	GuardEmit
)

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

Indexer embeds entity rows and upserts their vectors. It is the write-side counterpart to recall. Construct it once and call IndexEvent from the host's event consumer (see Reindex for backfill).

func NewIndexer

func NewIndexer(fab query.Fabric, reg *registry.Registry, emb Embedder) (*Indexer, error)

NewIndexer builds an Indexer. The Embedder is required (indexing without a model is meaningless).

func (*Indexer) IndexEvent

func (ix *Indexer) IndexEvent(ctx context.Context, env event.Envelope) error

IndexEvent indexes a create/update event whose aggregate is embeddable. On a ".deleted" event, it unindexes the aggregate row (calls Unindex). Non-delete events with an empty payload are skipped.

func (*Indexer) IndexRow

func (ix *Indexer) IndexRow(ctx context.Context, entity, id string, vals map[string]any) error

IndexRow embeds the row's text (per its EmbedSpec) and upserts the vector. No-op for non-embeddable entities or empty text.

func (*Indexer) Reindex

func (ix *Indexer) Reindex(ctx context.Context, entity string) (int, error)

Reindex re-embeds every row of an embeddable entity (backfill). Returns the number of rows indexed. No-op (0) for non-embeddable entities.

Reindex is tenant-scoped: it backfills only the tenant present in ctx. Call once per tenant for a full cluster-wide backfill.

Each page of rows is submitted to the Embedder as a single batch call, reducing round-trips from O(N) to O(N/page). Rows with an empty id or an empty computed embed-text are skipped and not counted.

func (*Indexer) Unindex

func (ix *Indexer) Unindex(ctx context.Context, entity, id string) error

Unindex removes an entity row's embedding. No-op for non-embeddable entities or empty id.

type MapLine

type MapLine struct {
	ID          string `json:"id"`
	Level       int    `json:"level"`
	Kind        string `json:"kind"`
	Scope       string `json:"scope,omitempty"`
	ContentHash string `json:"contentHash"`
	SemHash     string `json:"semHash"`
	Unchanged   bool   `json:"unchanged,omitempty"`
	Summary     string `json:"summary,omitempty"`
}

MapLine is one line of the bird's-eye digest-tree outline returned by Map.

type MapRequest

type MapRequest struct {
	// Scope, when non-empty, restricts the outline to nodes whose scope_id matches
	// plus the tenant root. Empty means include all nodes.
	Scope string `json:"scope,omitempty"`
	// KnownHashes is the caller's local snapshot: map[nodeID]contentHash.
	// Nodes whose ContentHash matches the known value are marked Unchanged=true
	// (Merkle-diff re-grounding — transfer only the diff).
	KnownHashes map[string]string `json:"knownHashes,omitempty"`
}

MapRequest parameters for Toolkit.Map.

type RecallRequest

type RecallRequest struct {
	Query    string      `json:"query"`
	Budget   int         `json:"budget"`
	Entities []string    `json:"entities"`
	K        int         `json:"k"`
	Hops     int         `json:"hops"`
	Filters  query.Where `json:"filters,omitempty"`
	// Altitude selects which layer of the distillation tree surfaces. AltAuto
	// (the zero value) defers to Config.Altitude, which itself defaults to
	// AltAuto: the budget then decides between entities and the tenant digest.
	Altitude Altitude `json:"altitude,omitempty"`
}

RecallRequest is the input to the auto-context recall pipeline.

type RememberRequest

type RememberRequest struct {
	Entity          string          `json:"entity"`
	Op              string          `json:"op"` // create|update|upsert|delete
	AggID           string          `json:"aggId,omitempty"`
	Payload         json.RawMessage `json:"payload,omitempty"`
	ExpectedVersion *int64          `json:"expectedVersion,omitempty"`
}

RememberRequest is the input to a guarded write.

type ResolveMatch

type ResolveMatch struct {
	Node        MapLine `json:"node"`
	HammingBits int     `json:"hammingBits"`
}

ResolveMatch is one entry in the Near list returned by Toolkit.Resolve.

type ResolveResult

type ResolveResult struct {
	Exact *MapLine       `json:"exact,omitempty"`
	Near  []ResolveMatch `json:"near,omitempty"`
}

ResolveResult is the value returned by Toolkit.Resolve. Exact is set when a node's ContentHash equals the queried hash. Near holds nodes whose SemHash is within resolveHammingThreshold bits of the queried SemHash (sorted ascending by HammingBits, tiebroken by ID).

type RollupReport

type RollupReport struct {
	ScopeNodes    int  // scope (L1) nodes (re)summarized this pass
	ClusterNodes  int  // cluster (L1) nodes (re)summarized this pass
	TenantRolled  bool // the tenant (L2) root was (re)summarized this pass
	ShortCircuits int  // internal nodes that matched their Merkle hash (no model call)
}

RollupReport summarizes one Rollup pass over the tenant in ctx.

type ScopeRef

type ScopeRef struct {
	Name string `json:"name"`
	ID   string `json:"id"`
}

ScopeRef names a scope a digest summarizes (empty for tenant/cluster nodes).

type Summarizer

type Summarizer interface {
	Summarize(ctx context.Context, in SummaryInput) (string, error)
}

Summarizer is the host-supplied summarization seam. fabriq stays model-agnostic.

type SummaryInput

type SummaryInput struct {
	Level    int           `json:"level"`
	Kind     string        `json:"kind"`
	Scope    ScopeRef      `json:"scope"`
	Children []ChildDigest `json:"children"`
	Raw      []byte        `json:"raw"`
	Budget   int           `json:"budget"`
}

SummaryInput is the host-model input for one summarization. For L0, Raw holds the source text; for L1/L2, Children holds the child summaries.

type Tool

type Tool struct {
	Name        string
	Description string
	InputSchema json.RawMessage
	Handler     func(ctx context.Context, args json.RawMessage) (any, error)
}

Tool is a transport-neutral agent tool descriptor. The MCP adapter maps each Tool to an MCP tool 1:1; Go agents can dispatch through Tools() or call the typed methods (e.g. Toolkit.Recall) directly.

type Toolkit

type Toolkit struct {
	// contains filtered or unexported fields
}

Toolkit is the transport-agnostic agent surface over the fabriq facade.

func NewToolkit

func NewToolkit(fab query.Fabric, reg *registry.Registry, emb Embedder, cfg Config) (*Toolkit, error)

NewToolkit builds a Toolkit. emb may be nil (semantic recall is then skipped).

func (*Toolkit) Digest

func (t *Toolkit) Digest(ctx context.Context, nodeID string) (DigestView, error)

Digest drills into one context-distillation node: it returns the node's MapLine, its summary text (retrieved from CAS by SummaryHash), and its immediate children with their own hashes and summaries.

When the Toolkit was created without a CAS (Config.CAS == nil), Summary and child Summary fields are always empty — this is graceful degradation, not an error.

Returns an error if nodeID is not found in the digest tree, or on any storage error.

func (*Toolkit) Map

func (t *Toolkit) Map(ctx context.Context, req MapRequest) ([]MapLine, error)

Map returns a compact outline of the tenant's context-distillation Merkle tree. Each line carries its ContentHash + SemHash. When req.KnownHashes is provided, nodes whose ContentHash matches the caller's known value are marked Unchanged=true (Merkle-diff re-grounding).

If req.Scope is non-empty, only nodes whose scope_id equals req.Scope and the tenant root are included. If the DigestEntity is not registered, (nil, nil) is returned.

func (*Toolkit) Recall

func (t *Toolkit) Recall(ctx context.Context, req RecallRequest) (ContextPack, error)

Recall runs the auto-context pipeline: build per-channel ranked candidates (vector, search; graph added in Task 2) → RRF fuse → hydrate authoritative rows → token-budget pack.

func (*Toolkit) Remember

func (t *Toolkit) Remember(ctx context.Context, req RememberRequest) (command.Result, error)

Remember performs a guarded write through the command plane. Power is WritePolicy ∩ tenant scope (inherited from ctx) ∩ lifecycle-hook rules (inherited from Exec). Errors are typed WriteErrors.

func (*Toolkit) Resolve

func (t *Toolkit) Resolve(ctx context.Context, hash string) (ResolveResult, error)

Resolve performs a quick reference lookup without re-embedding:

  • Exact: if any node's ContentHash equals hash, Exact is set to that node.
  • Near: if hash parses as a 16-hex SemHash (via ParseSemHash), every node whose SemHash is within resolveHammingThreshold Hamming bits is included in Near, sorted ascending by HammingBits and then by ID.

Both lookups are attempted independently. A node may appear in both Exact and Near if its ContentHash happens to be all-hex and its SemHash is also close. Returns (zero, nil) when DigestEntity is not registered.

func (*Toolkit) Tools

func (t *Toolkit) Tools() []Tool

Tools returns the agent-facing tool surface. Phase 1b exposes recall plus the four read primitives: vector_similar, search, graph_traverse, get. Phase 3 adds the guarded write tool: remember. Phase 4 adds the context-distillation tools: map, digest, resolve.

func (*Toolkit) Watch

func (t *Toolkit) Watch(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)

Watch subscribes to the conflated delta stream for a scope, so an agent can react to changes as they happen. It is a thin pass-through to the fabric's Subscribe (tenant-scoped from ctx). MCP streaming of watch is Phase 4; this is the in-process Go path.

type WriteError

type WriteError struct {
	Code string
	Msg  string
	Err  error
}

WriteError is the typed error a guarded write returns. Code is a stable, machine-readable reason; Err wraps the underlying cause (if any).

func (*WriteError) Error

func (e *WriteError) Error() string

func (*WriteError) Unwrap

func (e *WriteError) Unwrap() error

type WritePolicy

type WritePolicy struct {
	Allow map[string][]command.Op
}

WritePolicy is the opt-in allowlist for agent writes. An entity/op absent from Allow is denied (deny-by-default).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL