intelligence

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package intelligence will hold the auto-embedder infrastructure (S11 spike, docs/internals/design/S11_AUTO_EMBEDDER_REDESIGN.md). This file introduces the package with a single artifact: the bounded async worker pool (R2.2). Subsequent PRs add:

  • R2.3: Embedder interface + ErrNoIndexForTenant (embedder.go).
  • R2.4: LSAEmbedder adapter wrapping pkg/search.TenantLSAIndexes (lsa_embedder.go).
  • R2.5: AutoEmbedObserver + server_init.go wiring.

The pool is intentionally generic (operates on a Task interface) rather than embedding-specific (the spike's prescribed chan embedTask). The deviation is justified: R2.3's Embedder interface and R2.5's EmbedTask struct have not shipped, and a generic pool is honestly more useful for the open-core extension model — enterprise plugins can reuse the pool for non-embedding async work without redesigning it. R2.5 will define a concrete EmbedTask type that implements Task.

Index

Constants

View Source
const (
	DefaultWorkers         = 4
	DefaultQueueDepth      = 256
	DefaultShutdownTimeout = 5 * time.Second
)

Package defaults; chosen per S11 spike §7.5.

Variables

This section is empty.

Functions

This section is empty.

Types

type AutoEmbedObserver

type AutoEmbedObserver struct {
	// contains filtered or unexported fields
}

AutoEmbedObserver is the bridge between pkg/storage's NodeObserver notifications (R2.1) and the embedder pipeline. On a node-creation notification, it checks each registered EmbeddingPolicy; for every match it submits an autoEmbedTask to the worker Pool (R2.2) which computes an embedding via the configured Embedder (R2.3/R2.4) and writes it back to the node as a vector-typed property.

Lifecycle:

  • OnNodeCreated: dispatches one task per matching policy.
  • OnNodeUpdated: no-op in R2.5a. Activating update-driven embedding requires a re-entry guard (spike §7.2) — writing the embedding back would itself fire OnNodeUpdated and loop. The guard needs ctx-passing storage methods (a separate track) or a sentinel property key (leaks internal state); both are deliberately deferred. Until then, OnNodeUpdated stays a no-op so users who mutate a node's source text must currently delete + recreate (or manually re-embed via /v1/embeddings) to refresh the vector.
  • OnNodeDeleted: no-op. The node's vector index entries are removed in pkg/storage.RemoveNodeFromVectorIndexes (R1.2's tenant-aware path), which runs as part of DeleteNode before the observer dispatch.

Wiring:

AutoEmbedObserver is registered on a *storage.GraphStorage at startup via gs.AddObserver. R2.5b adds the env-driven wiring block in pkg/api/server_init.go that constructs the observer and registers it when GRAPHDB_AUTO_EMBED_* env vars are configured. Until R2.5b lands, callers wire AutoEmbedObserver manually in their own server bootstrap.

Pool ownership:

The Pool is a constructor argument, not constructed internally. Pool lifecycle is the caller's responsibility — multiple observers (or future intelligence consumers) may share a single pool, and Shutdown belongs at the same layer that owns server-lifecycle cleanup.

func NewAutoEmbedObserver

func NewAutoEmbedObserver(writer nodeWriter, embedder Embedder, pool *Pool, policies []EmbeddingPolicy) (*AutoEmbedObserver, error)

NewAutoEmbedObserver constructs an AutoEmbedObserver with the given dependencies and policies.

Validation:

  • writer, embedder, pool must be non-nil.
  • policies may be empty (the observer becomes a no-op).
  • Each policy must have non-empty Label, SourceProperty, TargetProperty.
  • SourceProperty != TargetProperty within a single policy (would overwrite the input).

Returns an error on any validation failure; this surfaces configuration bugs at startup rather than silently at runtime.

func (*AutoEmbedObserver) OnNodeCreated

func (o *AutoEmbedObserver) OnNodeCreated(ctx context.Context, node *storage.Node)

OnNodeCreated dispatches one autoEmbedTask per matching policy. The node is a clone snapshot supplied by pkg/storage's notify path (R2.1 guarantees this), safe to retain inside the task closure.

func (*AutoEmbedObserver) OnNodeDeleted

func (o *AutoEmbedObserver) OnNodeDeleted(_ context.Context, _ uint64, _ string)

OnNodeDeleted is a no-op: vector index entries are removed by pkg/storage.RemoveNodeFromVectorIndexes (R1.2) before the observer dispatch runs. The observer has nothing to clean up.

func (*AutoEmbedObserver) OnNodeUpdated

func (o *AutoEmbedObserver) OnNodeUpdated(_ context.Context, _ *storage.Node, _ *storage.Node)

OnNodeUpdated is intentionally a no-op in R2.5a.

Activating update-driven re-embedding requires a re-entry guard (S11 spike §7.2): the observer's own writeback would fire OnNodeUpdated and loop. The two guard options (ctx-value sentinel, internal property sentinel) each carry separate-track cost: ctx-passing storage methods don't exist yet; sentinel properties leak internal state. Until that's resolved, users who mutate a node's source text must delete+recreate (or call /v1/embeddings manually) to refresh the vector.

TODO(R2.x): when update-driven embedding activates, add the re-entry guard before this method does anything observable.

type Embedder

type Embedder interface {
	Embed(ctx context.Context, tenantID string, text string) ([]float32, error)
}

Embedder computes a dense vector embedding for a text input, scoped to a specific tenant.

Tenant scoping is the caller's responsibility — the embedder may use the tenantID to select a per-tenant model, corpus, or configuration. The canonical implementation in this package (R2.4: LSAEmbedder) routes by tenantID to a per-tenant LSA index registered via pkg/search; enterprise plugins may implement different backends (ONNX-bundled, hosted-API) that satisfy the same interface (Decision 3, tier-based: open-core resolution in NEXT_STEPS_2026-05-14.md).

Contract:

  • Embed MUST return a non-nil error if it cannot produce a real embedding. It MUST NEVER return a mock or placeholder vector alongside a nil error — silent fakery is the failure mode this interface exists to structurally prevent. See S11 spike §1.1 for the archive's `mockEmbedding` pattern this design rejects.

  • The returned slice length is consistent with the embedding space the caller has configured (e.g., the HNSW index dimensions). A dimension mismatch is a configuration error and should surface as a typed error from Embed, not be papered over.

  • Implementations must be safe for concurrent calls — the worker pool in worker.go dispatches multiple Tasks in parallel, each of which may call Embed concurrently.

  • Implementations should respect ctx.Done() for cancellation; the worker pool propagates Shutdown cancellation through Task.Execute's ctx, and embedding backends with expensive operations (network calls, large matrix multiplies) should bail when ctx is cancelled.

type EmbeddingPolicy

type EmbeddingPolicy struct {
	// Label is the node label this policy applies to (exact match). A
	// node with multiple labels matches if any label equals Label.
	Label string

	// SourceProperty is the property key containing the text to embed.
	// Must be present on the node AND be a string-typed Value; nodes
	// without it or with non-string values are silently skipped.
	SourceProperty string

	// TargetProperty is the property key the resulting embedding is
	// written to as a VectorValue. If the node already has this property
	// set at the time of OnNodeCreated, the observer skips the writeback
	// to preserve the user-provided value.
	TargetProperty string
}

EmbeddingPolicy describes one auto-embed rule: when a node with Label is created and has SourceProperty set as a string, compute an embedding of that text via the observer's Embedder and write it to TargetProperty as a VectorValue.

All three fields are required; constructor validation rejects empty values to surface configuration bugs at startup rather than at runtime.

Multiple policies may be registered. A single node creation triggers one task per matching policy, dispatched in registration order.

type ErrNoIndexForTenant

type ErrNoIndexForTenant struct {
	TenantID string
}

ErrNoIndexForTenant is returned by Embedder implementations when the target tenant has no configured backend state to embed against (e.g., no LSA index built, no per-tenant model loaded).

This is NOT a transient error — retrying the same Embed call without administrative action will return the same result. Callers (typically the AutoEmbedObserver in R2.5) should log + meter + drop the task rather than retry.

The error message is intentionally generic to fit any Embedder backend. Implementations that want backend-specific guidance (e.g., LSAEmbedder pointing the operator at `POST /hybrid-search/lsa-index`) should log that separately or wrap this error with %w + their own context. The errors.Is chain still resolves to ErrNoIndexForTenant for callers that branch on this condition.

func (ErrNoIndexForTenant) Error

func (e ErrNoIndexForTenant) Error() string

Error implements the error interface.

type LSAEmbedder

type LSAEmbedder struct {
	// contains filtered or unexported fields
}

LSAEmbedder implements Embedder by routing each Embed call to the caller's tenant's LSA index via search.TenantLSAIndexes. This is the canonical first-party Embedder implementation (R2.4 / S11 spike §6).

LSA is deterministic given a fixed corpus + seed (see pkg/search/lsa.go's "Seed=42 is a feature" framing), which makes LSAEmbedder a good baseline for embedding-space consistency between the read path (existing /v1/embeddings and /hybrid-search handlers) and the new write-path auto-embedder. Enterprise plugins providing zero-config or larger-scale embeddings (ONNX-bundled, hosted-API) implement the same Embedder interface and replace LSAEmbedder at startup time via the same AddObserver / wire-up surface R2.5 will introduce.

Operational note: LSAEmbedder.Embed returns ErrNoIndexForTenant when the tenant has not had an LSA index built yet. This is NOT a transient error — retrying without administrative action (POST /hybrid-search/lsa-index) returns the same result. The R2.5 AutoEmbedObserver will log + meter + drop the embedding task in that case rather than retrying.

func NewLSAEmbedder

func NewLSAEmbedder(indexes *search.TenantLSAIndexes) *LSAEmbedder

NewLSAEmbedder constructs an Embedder backed by the given per-tenant LSA registry. The registry MUST be non-nil; pass an empty search.NewTenantLSAIndexes() if no tenants have indexes yet (Embed will return ErrNoIndexForTenant for every call until tenants are registered via TenantLSAIndexes.Set).

func (*LSAEmbedder) Embed

func (e *LSAEmbedder) Embed(ctx context.Context, tenantID string, text string) ([]float32, error)

Embed projects text into tenantID's LSA latent space.

Returns ErrNoIndexForTenant when no LSA index has been built for tenantID. Returns an error (from search.LSAIndex.FoldQuery) when the query is out-of-vocabulary for the tenant's index or projects to the zero vector. Honors ctx.Err() — if the caller cancels before Embed starts the projection, returns the ctx error without touching the index.

On success, the returned []float32 has length equal to the tenant's LSAIndex.Dimensions() (200 in production via DefaultLSAConfig, smaller in tests). It is L2-normalized — FoldQuery guarantees this for any successful return.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a bounded async worker pool with drop-on-full backpressure, a configurable shutdown drain timeout, and an optional synchronous mode for tests. See S11 spike §7.5 for the design rationale.

The pool is generic: it operates on a Task interface, not an embedding- specific task type. R2.5's AutoEmbedObserver will define EmbedTask implementing Task.

func NewPool

func NewPool(cfg PoolConfig) *Pool

NewPool constructs a Pool with cfg's settings. Zero-value fields receive package defaults. The returned pool is ready for Submit immediately.

func (*Pool) Dropped

func (p *Pool) Dropped() uint64

Dropped returns the total number of tasks dropped due to a full queue or post-shutdown submits. Useful for monitoring backpressure.

func (*Pool) InFlight

func (p *Pool) InFlight() int64

InFlight returns the number of tasks currently inside Execute. Primarily for tests and observability; production callers usually don't need this.

func (*Pool) Shutdown

func (p *Pool) Shutdown(ctx context.Context) int

Shutdown closes the pool's task queue, signals in-flight tasks via the pool's internal context, and waits up to ShutdownTimeout for workers to drain. Returns 0 if all in-flight tasks completed within the timeout; otherwise returns the count of tasks still inside Execute when the timeout fired.

After Shutdown returns, Submit always returns false.

Safe to call multiple times; only the first call has effect. Subsequent calls return 0 immediately.

The ctx parameter is honored — if it is canceled before ShutdownTimeout elapses, Shutdown returns early with the current in-flight count.

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, task Task) bool

Submit enqueues task for async execution. Returns true on successful enqueue, false when the task is dropped — either because the queue is full or because the pool is closed.

Submit never blocks the caller. Backpressure manifests as drops, not stalls, per S11 spike §7.5 ("Dropping on back-pressure is preferable to blocking CreateNode").

In synchronous mode, Submit runs task.Execute inline on the caller's goroutine and returns true unless the pool is closed. The caller's context is passed through to Execute.

type PoolConfig

type PoolConfig struct {
	// Workers is the number of concurrent worker goroutines.
	// Zero or negative selects DefaultWorkers.
	Workers int

	// QueueDepth is the bounded queue capacity. Submit() drops tasks when
	// the queue is full. Zero or negative selects DefaultQueueDepth.
	QueueDepth int

	// ShutdownTimeout caps how long Shutdown blocks waiting for in-flight
	// tasks to drain. Zero or negative selects DefaultShutdownTimeout.
	ShutdownTimeout time.Duration

	// Synchronous, when true, makes Submit execute the task inline on the
	// caller's goroutine — no queue, no workers. Used for deterministic
	// tests where the caller wants to observe the post-execution state
	// immediately. Production callers must leave this false.
	Synchronous bool
}

PoolConfig configures a Pool. Zero values are replaced with package defaults; see DefaultWorkers / DefaultQueueDepth / DefaultShutdownTimeout.

type Task

type Task interface {
	Execute(ctx context.Context)
}

Task is a unit of async work executed inside a Pool worker goroutine.

The pool passes a context derived from the pool's internal lifecycle context — it is canceled when the pool is shut down. Tasks that ignore the context keep running to completion (and count toward "unfinished" at shutdown); tasks that respect ctx.Done() can bail early on shutdown.

Execute must not panic. The pool recovers panics so a misbehaving Task does not crash workers, but recovery is a safety net, not the contract.

type TaskFunc

type TaskFunc func(ctx context.Context)

TaskFunc is an adapter for using a bare function as a Task — analogous to http.HandlerFunc. Useful for tests and simple async work that doesn't warrant a dedicated struct.

func (TaskFunc) Execute

func (f TaskFunc) Execute(ctx context.Context)

Execute implements Task by calling f(ctx).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL