destination

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Overview

Package destination defines the DestinationWriter interface and canonical write envelope (TranslatedPayload) consumed by all memory backends.

The WAL entry Payload field (json.RawMessage) is deserialized to TranslatedPayload by the queue worker before calling Write. Backends MUST treat payloads as idempotent: re-delivery of the same PayloadID must not produce a duplicate record.

Index

Constants

View Source
const (
	// DefaultQueryLimit is applied when the client sends limit=0 or omits it.
	DefaultQueryLimit = 20
	// MaxQueryLimit is the hard cap regardless of what the client requests.
	MaxQueryLimit = 200
)

Variables

This section is empty.

Functions

func ClampLimit

func ClampLimit(requested int) int

ClampLimit enforces the default and maximum query limits. Reference: Tech Spec Phase 0C Behavioral Contract item 16.

func DecodeCursor

func DecodeCursor(cursor string) (int, error)

DecodeCursor decodes a cursor string back to an integer offset. Returns 0 and no error for an empty cursor (first page).

func EncodeCursor

func EncodeCursor(offset int) string

EncodeCursor encodes an integer offset as a URL-safe base64 cursor string.

func ValidActorType

func ValidActorType(s string) bool

ValidActorType reports whether s is one of the accepted provenance values: "user", "agent", or "system". Empty is NOT valid — callers must resolve defaults before calling this function.

Reference: Tech Spec Section 7.1 — Provenance Semantics.

Types

type ConflictGroup

type ConflictGroup struct {
	Subject           string      `json:"subject"`
	EntityKey         string      `json:"entity_key"` // collection field
	ConflictingValues []string    `json:"conflicting_values"`
	Sources           []string    `json:"sources"`
	Timestamps        []time.Time `json:"timestamps"`
	Count             int         `json:"count"`
}

ConflictGroup represents a set of contradictory memories sharing the same subject and collection (entity_key) but with divergent content. Reference: Tech Spec Section 13.2 — Conflict Inspector.

type ConflictParams

type ConflictParams struct {
	Source    string // filter by source
	Subject   string // filter by subject
	ActorType string // filter by actor_type
	Limit     int
	Offset    int
}

ConflictParams holds filter and pagination options for conflict queries.

type ConflictQuerier

type ConflictQuerier interface {
	// QueryConflicts returns groups of contradictory memories.
	QueryConflicts(params ConflictParams) ([]ConflictGroup, error)
}

ConflictQuerier is an optional interface for destination backends that support conflict detection. Callers must type-assert to check for support. Reference: Tech Spec Section 13.2.

type DestinationWriter

type DestinationWriter interface {
	// Write persists p to the destination. Implementations MUST be idempotent:
	// writing the same PayloadID twice must succeed without producing a
	// duplicate record.
	Write(p TranslatedPayload) error

	// Ping verifies the destination is reachable and healthy. Used by the
	// doctor command and /ready health endpoint.
	Ping() error

	// Exists reports whether a record with the given payloadID has been
	// written to the destination. Used by consistency assertions (Phase R-10).
	Exists(payloadID string) (bool, error)

	// Close releases all resources held by the destination. Safe to call once.
	Close() error
}

DestinationWriter is the interface satisfied by every memory backend (SQLite, PostgreSQL, Supabase, etc.). All implementations MUST be safe for concurrent use by multiple goroutines.

Reference: Tech Spec Section 3.2.

type MemoryCounter

type MemoryCounter interface {
	// MemoryCount returns the total number of memory records in the destination.
	MemoryCount() (int64, error)
}

MemoryCounter is an optional interface for destination backends that support counting the total number of stored memories. Used by the /api/status admin endpoint to populate the memories_total field. Callers must type-assert.

type PostgresDestination

type PostgresDestination struct {
	// contains filtered or unexported fields
}

PostgresDestination writes TranslatedPayload records to a PostgreSQL database with pgvector support for Stage 4 semantic retrieval.

The embedding column stores a pgvector vector. When a payload carries no embedding, the column is NULL and CanSemanticSearch returns false for that record (but the destination as a whole may still support semantic search if other records have embeddings).

All SQL uses parameterized statements. Write is idempotent via INSERT ... ON CONFLICT DO NOTHING.

Reference: Tech Spec Section 3.4 — Stage 4, Phase 5 Behavioral Contract 3.

func OpenPostgres

func OpenPostgres(dsn string, dimensions int, logger *slog.Logger) (*PostgresDestination, error)

OpenPostgres opens a connection to the PostgreSQL database identified by dsn, enables the pgvector extension, and creates the memories schema.

dsn is a libpq-style connection string or a pgx DSN (e.g. "postgres://user:pass@host:5432/db"). dimensions is the embedding vector size (e.g. 1536 for text-embedding-3-small).

func (*PostgresDestination) CanSemanticSearch

func (d *PostgresDestination) CanSemanticSearch() bool

CanSemanticSearch reports whether any row in the memories table has a non-NULL embedding, indicating the pgvector index is usable.

func (*PostgresDestination) Close

func (d *PostgresDestination) Close() error

Close closes the database connection pool.

func (*PostgresDestination) Exists

func (d *PostgresDestination) Exists(payloadID string) (bool, error)

Exists reports whether a record with payloadID exists in the memories table.

func (*PostgresDestination) Ping

func (d *PostgresDestination) Ping() error

Ping verifies the database connection is alive.

func (*PostgresDestination) Query

func (d *PostgresDestination) Query(params QueryParams) (QueryResult, error)

Query returns a page of memories using parameterized structured queries.

func (*PostgresDestination) SemanticSearch

func (d *PostgresDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)

SemanticSearch performs a pgvector cosine similarity search. It issues a parameterized ORDER BY embedding <=> $1 query using the ivfflat index created during schema setup.

Reference: Tech Spec Section 3.4 — Stage 4.

func (*PostgresDestination) Write

Write persists p to the memories table. Idempotent via ON CONFLICT DO NOTHING. If p.Embedding is non-empty it is stored as a pgvector value; otherwise NULL. All values are bound via parameterized placeholders.

type Querier

type Querier interface {
	// Query returns a page of memories matching params.
	Query(params QueryParams) (QueryResult, error)
}

Querier is the read interface satisfied by memory backends. It is separate from DestinationWriter to allow read-only facade implementations and to keep the write-path interface minimal.

Reference: Tech Spec Section 3.3, Section 12.

type QueryParams

type QueryParams struct {
	// Destination is the target destination name.
	Destination string
	// Namespace filters results to a specific namespace.
	Namespace string
	// Subject filters results to a specific subject. Empty means all subjects.
	Subject string
	// Q is a text substring filter applied to the content field. Empty means no filter.
	Q string
	// Limit is the maximum number of records to return. Callers must cap at 200.
	Limit int
	// Cursor is the opaque pagination cursor from a previous QueryResult.NextCursor.
	Cursor string
	// Profile is the retrieval profile (fast, balanced, deep). Used by later phases.
	Profile string
	// ActorType filters results by provenance (user, agent, system). Empty means
	// no filter.
	//
	// Reference: Tech Spec Section 7.1.
	ActorType string
}

QueryParams are the input parameters for a basic structured query. The full 6-stage retrieval cascade (Phase 3+) builds on top of this.

Reference: Tech Spec Section 3.3, Section 3.8.

type QueryResult

type QueryResult struct {
	Records    []TranslatedPayload
	NextCursor string
	HasMore    bool
}

QueryResult holds one page of query results and pagination state.

Reference: Tech Spec Section 3.8.

type SQLiteDestination

type SQLiteDestination struct {
	// contains filtered or unexported fields
}

SQLiteDestination writes TranslatedPayload records to a SQLite database. The database is opened with PRAGMA journal_mode=WAL and PRAGMA busy_timeout=5000 to maximise concurrent read throughput and avoid "database is locked" errors under moderate write load.

All SQL queries use parameterized statements — string concatenation for SQL is never used. Write is idempotent: INSERT OR IGNORE means re-delivering a payload_id is a no-op.

All state is held in struct fields; there are no package-level variables.

func OpenSQLite

func OpenSQLite(path string, logger *slog.Logger) (*SQLiteDestination, error)

OpenSQLite opens (or creates) a SQLite database at path, applies the required PRAGMAs, and creates the memories schema if absent. The parent directory is created with 0700 permissions; the database file is created with 0600 permissions.

Panics if logger is nil. Returns an error if the database cannot be opened or the schema cannot be applied.

func (*SQLiteDestination) CanSemanticSearch

func (d *SQLiteDestination) CanSemanticSearch() bool

CanSemanticSearch reports whether this SQLite destination has any rows with a stored embedding. Returns false when the table is empty or no records have embeddings, signalling Stage 4 to degrade gracefully.

Reference: Tech Spec Section 3.4 — Stage 4.

func (*SQLiteDestination) Close

func (d *SQLiteDestination) Close() error

Close closes the underlying database connection. Safe to call once.

func (*SQLiteDestination) DeletePayload

func (d *SQLiteDestination) DeletePayload(payloadID string) (int64, error)

DeletePayload removes a single record by payload_id. Used by consistency assertion tests to verify that the checker detects missing entries. Returns the number of rows deleted (0 or 1).

func (*SQLiteDestination) Exists

func (d *SQLiteDestination) Exists(payloadID string) (bool, error)

Exists reports whether a record with payloadID exists in the memories table. Used by consistency assertions (Phase R-10).

func (*SQLiteDestination) MemoryCount

func (d *SQLiteDestination) MemoryCount() (int64, error)

MemoryCount returns the total number of memory records in the SQLite destination. Used by the /api/status admin endpoint.

func (*SQLiteDestination) Ping

func (d *SQLiteDestination) Ping() error

Ping verifies the database connection is alive by executing a lightweight query. Used by the doctor command and /ready health endpoint.

func (*SQLiteDestination) Query

func (d *SQLiteDestination) Query(params QueryParams) (QueryResult, error)

Query returns a page of memories matching params using a basic structured query on the memories table. It implements the Querier interface for Phase 0C. The full 6-stage retrieval cascade (Phase 3+) replaces this for production reads.

All SQL is parameterized — no string concatenation. Limit and offset are computed from params.Limit (clamped) and the decoded cursor.

Reference: Tech Spec Section 3.3, Phase 0C Behavioral Contract item 16.

func (*SQLiteDestination) QueryConflicts

func (d *SQLiteDestination) QueryConflicts(params ConflictParams) ([]ConflictGroup, error)

QueryConflicts returns groups of contradictory memories. It groups by subject + collection (entity_key) and returns groups where multiple distinct content values exist.

All SQL is parameterized. NEVER string concatenation. Reference: Tech Spec Section 13.2.

func (*SQLiteDestination) QueryTimeTravel

func (d *SQLiteDestination) QueryTimeTravel(params TimeTravelParams) (QueryResult, error)

QueryTimeTravel returns memories as of the specified timestamp. Results are filtered to timestamp <= as_of and ordered by timestamp DESC.

All SQL is parameterized. NEVER string concatenation. Reference: Tech Spec Section 13.2.

func (*SQLiteDestination) SemanticSearch

func (d *SQLiteDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)

SemanticSearch performs application-level cosine similarity search over all stored embeddings. It fetches candidate rows filtered by namespace and destination, computes cosine similarity against vec in Go, and returns the top-N results by score descending.

This implementation does not use sqlite-vec (which requires CGO). It provides correct cosine-ranked results using the modernc pure-Go driver.

The over-sample factor is applied to the SQL LIMIT to increase recall before in-process reranking: we fetch min(overSample * limit, totalRows) candidates.

Reference: Tech Spec Section 3.4 — Stage 4.

func (*SQLiteDestination) Write

Write persists p to the memories table. The operation is idempotent: INSERT OR IGNORE silently discards a write whose payload_id already exists. All values are bound via parameterized placeholders — no string interpolation. If p.Embedding is non-empty it is serialized as a little-endian float32 BLOB and stored in the embedding column for Stage 4 semantic retrieval.

type ScoredRecord

type ScoredRecord struct {
	Payload TranslatedPayload
	Score   float32
}

ScoredRecord pairs a TranslatedPayload with its cosine similarity score from a semantic search. Used by Stage 4 (Semantic Retrieval).

Reference: Tech Spec Section 3.4 — Stage 4.

type SemanticSearcher

type SemanticSearcher interface {
	// CanSemanticSearch reports whether this destination has vector index support
	// and at least one indexed embedding. Returns false to signal graceful skip.
	CanSemanticSearch() bool

	// SemanticSearch returns up to params.Limit records nearest to vec, ranked
	// by cosine similarity descending. Filters are applied from params
	// (Namespace, Destination). Implementations must be safe for concurrent use.
	SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
}

SemanticSearcher is implemented by destination backends that support vector similarity search. It is an optional extension of DestinationWriter — callers must type-assert to check for support.

CanSemanticSearch must be checked before calling SemanticSearch. Destinations that have no indexed embeddings (e.g. a fresh SQLite DB with no writes) may return false to skip Stage 4 gracefully.

Reference: Tech Spec Section 3.4 — Stage 4.

type SupabaseDestination

type SupabaseDestination struct {
	// contains filtered or unexported fields
}

SupabaseDestination writes and reads TranslatedPayload records via the Supabase REST API. Semantic search uses the match_memories RPC function which must be created in the Supabase database:

CREATE OR REPLACE FUNCTION match_memories(
    query_embedding vector(1536),
    match_count int,
    namespace text DEFAULT NULL,
    destination_id text DEFAULT NULL
) RETURNS TABLE ( ... ) ...

The resolved API key is never logged at any level.

Reference: Tech Spec Section 3.4 — Stage 4, Phase 5 Behavioral Contract 3.

func OpenSupabase

func OpenSupabase(baseURL, resolvedKey string, logger *slog.Logger) (*SupabaseDestination, error)

OpenSupabase creates a SupabaseDestination. baseURL is the Supabase project URL (e.g. "https://xyzproject.supabase.co"). resolvedKey is the pre-resolved service-role or anon key — never re-resolved per request.

INVARIANT: resolvedKey is never logged at any level.

func (*SupabaseDestination) CanSemanticSearch

func (d *SupabaseDestination) CanSemanticSearch() bool

CanSemanticSearch reports whether the match_memories RPC is available. We always return true for Supabase — if the function doesn't exist, the RPC call returns an HTTP error which SemanticSearch will propagate.

func (*SupabaseDestination) Close

func (d *SupabaseDestination) Close() error

Close is a no-op for the Supabase destination (http.Client has no close).

func (*SupabaseDestination) Exists

func (d *SupabaseDestination) Exists(payloadID string) (bool, error)

Exists reports whether a record with payloadID exists.

func (*SupabaseDestination) Ping

func (d *SupabaseDestination) Ping() error

Ping checks connectivity by sending a HEAD request to /rest/v1/memories.

func (*SupabaseDestination) Query

func (d *SupabaseDestination) Query(params QueryParams) (QueryResult, error)

Query returns a page of memories using the Supabase REST API filter syntax. All filter values are passed as URL query parameters — no string interpolation in SQL.

func (*SupabaseDestination) SemanticSearch

func (d *SupabaseDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)

SemanticSearch calls the match_memories PostgreSQL function via the Supabase RPC endpoint. The function must exist in the Supabase database.

Reference: Tech Spec Section 3.4 — Stage 4.

func (*SupabaseDestination) Write

Write inserts p into the Supabase memories table via POST /rest/v1/memories. Uses Prefer: resolution=ignore-duplicates for idempotent behaviour. INVARIANT: resolvedKey is never logged.

type TimeTravelParams

type TimeTravelParams struct {
	AsOf        time.Time // return memories with timestamp <= this value
	Namespace   string
	Subject     string
	Destination string
	Limit       int
	Offset      int
}

TimeTravelParams holds options for time-travel queries.

type TimeTravelQuerier

type TimeTravelQuerier interface {
	// QueryTimeTravel returns memories as of the specified timestamp.
	QueryTimeTravel(params TimeTravelParams) (QueryResult, error)
}

TimeTravelQuerier is an optional interface for destination backends that support time-travel queries. Callers must type-assert to check for support. Reference: Tech Spec Section 13.2.

type TranslatedPayload

type TranslatedPayload struct {
	PayloadID        string            `json:"payload_id"`
	RequestID        string            `json:"request_id"`
	Source           string            `json:"source"`
	Subject          string            `json:"subject"`
	Namespace        string            `json:"namespace"`
	Destination      string            `json:"destination"`
	Collection       string            `json:"collection"`
	Content          string            `json:"content"`
	Model            string            `json:"model"`
	Role             string            `json:"role"`
	Timestamp        time.Time         `json:"timestamp"`
	IdempotencyKey   string            `json:"idempotency_key"`
	SchemaVersion    int               `json:"schema_version"`
	TransformVersion string            `json:"transform_version"`
	ActorType        string            `json:"actor_type"` // user, agent, or system
	ActorID          string            `json:"actor_id"`   // identity of the actor
	Embedding        []float32         `json:"embedding,omitempty"`
	Metadata         map[string]string `json:"metadata,omitempty"`

	// Addendum A3: Retrieval Firewall — sensitivity classification fields.
	// SensitivityLabels are free-form tags (e.g. "pii", "financial").
	// ClassificationTier is a single tier from the configured tier_order
	// (default: "public"). Both default to empty/public when absent.
	// Reference: Tech Spec Addendum Section A3.2.
	SensitivityLabels  []string `json:"sensitivity_labels,omitempty"`
	ClassificationTier string   `json:"classification_tier,omitempty"`
}

TranslatedPayload is the canonical write envelope produced by the field mapping and transform stages of the write path. It is stored in the WAL Payload field as raw JSON and deserialized by the queue worker before handing off to a DestinationWriter.

Reference: Tech Spec Section 7.1.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL