Documentation
¶
Overview ¶
Package destination defines the DestinationWriter interface and canonical write envelope (TranslatedPayload) consumed by all memory backends.
The WAL entry Payload field (json.RawMessage) is deserialized to TranslatedPayload by the queue worker before calling Write. Backends MUST treat payloads as idempotent: re-delivery of the same PayloadID must not produce a duplicate record.
Index ¶
- Constants
- func ClampLimit(requested int) int
- func DecodeCursor(cursor string) (int, error)
- func EncodeCursor(offset int) string
- func ValidActorType(s string) bool
- type ConflictGroup
- type ConflictParams
- type ConflictQuerier
- type DestinationWriter
- type MemoryCounter
- type PostgresDestination
- func (d *PostgresDestination) CanSemanticSearch() bool
- func (d *PostgresDestination) Close() error
- func (d *PostgresDestination) Exists(payloadID string) (bool, error)
- func (d *PostgresDestination) Ping() error
- func (d *PostgresDestination) Query(params QueryParams) (QueryResult, error)
- func (d *PostgresDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
- func (d *PostgresDestination) Write(p TranslatedPayload) error
- type Querier
- type QueryParams
- type QueryResult
- type SQLiteDestination
- func (d *SQLiteDestination) CanSemanticSearch() bool
- func (d *SQLiteDestination) Close() error
- func (d *SQLiteDestination) DeletePayload(payloadID string) (int64, error)
- func (d *SQLiteDestination) Exists(payloadID string) (bool, error)
- func (d *SQLiteDestination) MemoryCount() (int64, error)
- func (d *SQLiteDestination) Ping() error
- func (d *SQLiteDestination) Query(params QueryParams) (QueryResult, error)
- func (d *SQLiteDestination) QueryConflicts(params ConflictParams) ([]ConflictGroup, error)
- func (d *SQLiteDestination) QueryTimeTravel(params TimeTravelParams) (QueryResult, error)
- func (d *SQLiteDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
- func (d *SQLiteDestination) Write(p TranslatedPayload) error
- type ScoredRecord
- type SemanticSearcher
- type SupabaseDestination
- func (d *SupabaseDestination) CanSemanticSearch() bool
- func (d *SupabaseDestination) Close() error
- func (d *SupabaseDestination) Exists(payloadID string) (bool, error)
- func (d *SupabaseDestination) Ping() error
- func (d *SupabaseDestination) Query(params QueryParams) (QueryResult, error)
- func (d *SupabaseDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
- func (d *SupabaseDestination) Write(p TranslatedPayload) error
- type TimeTravelParams
- type TimeTravelQuerier
- type TranslatedPayload
Constants ¶
const ( // DefaultQueryLimit is applied when the client sends limit=0 or omits it. DefaultQueryLimit = 20 // MaxQueryLimit is the hard cap regardless of what the client requests. MaxQueryLimit = 200 )
Variables ¶
This section is empty.
Functions ¶
func ClampLimit ¶
ClampLimit enforces the default and maximum query limits. Reference: Tech Spec Phase 0C Behavioral Contract item 16.
func DecodeCursor ¶
DecodeCursor decodes a cursor string back to an integer offset. Returns 0 and no error for an empty cursor (first page).
func EncodeCursor ¶
EncodeCursor encodes an integer offset as a URL-safe base64 cursor string.
func ValidActorType ¶
ValidActorType reports whether s is one of the accepted provenance values: "user", "agent", or "system". Empty is NOT valid — callers must resolve defaults before calling this function.
Reference: Tech Spec Section 7.1 — Provenance Semantics.
Types ¶
type ConflictGroup ¶
type ConflictGroup struct {
Subject string `json:"subject"`
EntityKey string `json:"entity_key"` // collection field
ConflictingValues []string `json:"conflicting_values"`
Sources []string `json:"sources"`
Timestamps []time.Time `json:"timestamps"`
Count int `json:"count"`
}
ConflictGroup represents a set of contradictory memories sharing the same subject and collection (entity_key) but with divergent content. Reference: Tech Spec Section 13.2 — Conflict Inspector.
type ConflictParams ¶
type ConflictParams struct {
Source string // filter by source
Subject string // filter by subject
ActorType string // filter by actor_type
Limit int
Offset int
}
ConflictParams holds filter and pagination options for conflict queries.
type ConflictQuerier ¶
type ConflictQuerier interface {
// QueryConflicts returns groups of contradictory memories.
QueryConflicts(params ConflictParams) ([]ConflictGroup, error)
}
ConflictQuerier is an optional interface for destination backends that support conflict detection. Callers must type-assert to check for support. Reference: Tech Spec Section 13.2.
type DestinationWriter ¶
type DestinationWriter interface {
// Write persists p to the destination. Implementations MUST be idempotent:
// writing the same PayloadID twice must succeed without producing a
// duplicate record.
Write(p TranslatedPayload) error
// Ping verifies the destination is reachable and healthy. Used by the
// doctor command and /ready health endpoint.
Ping() error
// Exists reports whether a record with the given payloadID has been
// written to the destination. Used by consistency assertions (Phase R-10).
Exists(payloadID string) (bool, error)
// Close releases all resources held by the destination. Safe to call once.
Close() error
}
DestinationWriter is the interface satisfied by every memory backend (SQLite, PostgreSQL, Supabase, etc.). All implementations MUST be safe for concurrent use by multiple goroutines.
Reference: Tech Spec Section 3.2.
type MemoryCounter ¶
type MemoryCounter interface {
// MemoryCount returns the total number of memory records in the destination.
MemoryCount() (int64, error)
}
MemoryCounter is an optional interface for destination backends that support counting the total number of stored memories. Used by the /api/status admin endpoint to populate the memories_total field. Callers must type-assert.
type PostgresDestination ¶
type PostgresDestination struct {
// contains filtered or unexported fields
}
PostgresDestination writes TranslatedPayload records to a PostgreSQL database with pgvector support for Stage 4 semantic retrieval.
The embedding column stores a pgvector vector. When a payload carries no embedding, the column is NULL and CanSemanticSearch returns false for that record (but the destination as a whole may still support semantic search if other records have embeddings).
All SQL uses parameterized statements. Write is idempotent via INSERT ... ON CONFLICT DO NOTHING.
Reference: Tech Spec Section 3.4 — Stage 4, Phase 5 Behavioral Contract 3.
func OpenPostgres ¶
OpenPostgres opens a connection to the PostgreSQL database identified by dsn, enables the pgvector extension, and creates the memories schema.
dsn is a libpq-style connection string or a pgx DSN (e.g. "postgres://user:pass@host:5432/db"). dimensions is the embedding vector size (e.g. 1536 for text-embedding-3-small).
func (*PostgresDestination) CanSemanticSearch ¶
func (d *PostgresDestination) CanSemanticSearch() bool
CanSemanticSearch reports whether any row in the memories table has a non-NULL embedding, indicating the pgvector index is usable.
func (*PostgresDestination) Close ¶
func (d *PostgresDestination) Close() error
Close closes the database connection pool.
func (*PostgresDestination) Exists ¶
func (d *PostgresDestination) Exists(payloadID string) (bool, error)
Exists reports whether a record with payloadID exists in the memories table.
func (*PostgresDestination) Ping ¶
func (d *PostgresDestination) Ping() error
Ping verifies the database connection is alive.
func (*PostgresDestination) Query ¶
func (d *PostgresDestination) Query(params QueryParams) (QueryResult, error)
Query returns a page of memories using parameterized structured queries.
func (*PostgresDestination) SemanticSearch ¶
func (d *PostgresDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
SemanticSearch performs a pgvector cosine similarity search. It issues a parameterized ORDER BY embedding <=> $1 query using the ivfflat index created during schema setup.
Reference: Tech Spec Section 3.4 — Stage 4.
func (*PostgresDestination) Write ¶
func (d *PostgresDestination) Write(p TranslatedPayload) error
Write persists p to the memories table. Idempotent via ON CONFLICT DO NOTHING. If p.Embedding is non-empty it is stored as a pgvector value; otherwise NULL. All values are bound via parameterized placeholders.
type Querier ¶
type Querier interface {
// Query returns a page of memories matching params.
Query(params QueryParams) (QueryResult, error)
}
Querier is the read interface satisfied by memory backends. It is separate from DestinationWriter to allow read-only facade implementations and to keep the write-path interface minimal.
Reference: Tech Spec Section 3.3, Section 12.
type QueryParams ¶
type QueryParams struct {
// Destination is the target destination name.
Destination string
// Namespace filters results to a specific namespace.
Namespace string
// Subject filters results to a specific subject. Empty means all subjects.
Subject string
// Q is a text substring filter applied to the content field. Empty means no filter.
Q string
// Limit is the maximum number of records to return. Callers must cap at 200.
Limit int
// Cursor is the opaque pagination cursor from a previous QueryResult.NextCursor.
Cursor string
// Profile is the retrieval profile (fast, balanced, deep). Used by later phases.
Profile string
// ActorType filters results by provenance (user, agent, system). Empty means
// no filter.
//
// Reference: Tech Spec Section 7.1.
ActorType string
}
QueryParams are the input parameters for a basic structured query. The full 6-stage retrieval cascade (Phase 3+) builds on top of this.
Reference: Tech Spec Section 3.3, Section 3.8.
type QueryResult ¶
type QueryResult struct {
Records []TranslatedPayload
NextCursor string
HasMore bool
}
QueryResult holds one page of query results and pagination state.
Reference: Tech Spec Section 3.8.
type SQLiteDestination ¶
type SQLiteDestination struct {
// contains filtered or unexported fields
}
SQLiteDestination writes TranslatedPayload records to a SQLite database. The database is opened with PRAGMA journal_mode=WAL and PRAGMA busy_timeout=5000 to maximise concurrent read throughput and avoid "database is locked" errors under moderate write load.
All SQL queries use parameterized statements — string concatenation for SQL is never used. Write is idempotent: INSERT OR IGNORE means re-delivering a payload_id is a no-op.
All state is held in struct fields; there are no package-level variables.
func OpenSQLite ¶
func OpenSQLite(path string, logger *slog.Logger) (*SQLiteDestination, error)
OpenSQLite opens (or creates) a SQLite database at path, applies the required PRAGMAs, and creates the memories schema if absent. The parent directory is created with 0700 permissions; the database file is created with 0600 permissions.
Panics if logger is nil. Returns an error if the database cannot be opened or the schema cannot be applied.
func (*SQLiteDestination) CanSemanticSearch ¶
func (d *SQLiteDestination) CanSemanticSearch() bool
CanSemanticSearch reports whether this SQLite destination has any rows with a stored embedding. Returns false when the table is empty or no records have embeddings, signalling Stage 4 to degrade gracefully.
Reference: Tech Spec Section 3.4 — Stage 4.
func (*SQLiteDestination) Close ¶
func (d *SQLiteDestination) Close() error
Close closes the underlying database connection. Safe to call once.
func (*SQLiteDestination) DeletePayload ¶
func (d *SQLiteDestination) DeletePayload(payloadID string) (int64, error)
DeletePayload removes a single record by payload_id. Used by consistency assertion tests to verify that the checker detects missing entries. Returns the number of rows deleted (0 or 1).
func (*SQLiteDestination) Exists ¶
func (d *SQLiteDestination) Exists(payloadID string) (bool, error)
Exists reports whether a record with payloadID exists in the memories table. Used by consistency assertions (Phase R-10).
func (*SQLiteDestination) MemoryCount ¶
func (d *SQLiteDestination) MemoryCount() (int64, error)
MemoryCount returns the total number of memory records in the SQLite destination. Used by the /api/status admin endpoint.
func (*SQLiteDestination) Ping ¶
func (d *SQLiteDestination) Ping() error
Ping verifies the database connection is alive by executing a lightweight query. Used by the doctor command and /ready health endpoint.
func (*SQLiteDestination) Query ¶
func (d *SQLiteDestination) Query(params QueryParams) (QueryResult, error)
Query returns a page of memories matching params using a basic structured query on the memories table. It implements the Querier interface for Phase 0C. The full 6-stage retrieval cascade (Phase 3+) replaces this for production reads.
All SQL is parameterized — no string concatenation. Limit and offset are computed from params.Limit (clamped) and the decoded cursor.
Reference: Tech Spec Section 3.3, Phase 0C Behavioral Contract item 16.
func (*SQLiteDestination) QueryConflicts ¶
func (d *SQLiteDestination) QueryConflicts(params ConflictParams) ([]ConflictGroup, error)
QueryConflicts returns groups of contradictory memories. It groups by subject + collection (entity_key) and returns groups where multiple distinct content values exist.
All SQL is parameterized. NEVER string concatenation. Reference: Tech Spec Section 13.2.
func (*SQLiteDestination) QueryTimeTravel ¶
func (d *SQLiteDestination) QueryTimeTravel(params TimeTravelParams) (QueryResult, error)
QueryTimeTravel returns memories as of the specified timestamp. Results are filtered to timestamp <= as_of and ordered by timestamp DESC.
All SQL is parameterized. NEVER string concatenation. Reference: Tech Spec Section 13.2.
func (*SQLiteDestination) SemanticSearch ¶
func (d *SQLiteDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
SemanticSearch performs application-level cosine similarity search over all stored embeddings. It fetches candidate rows filtered by namespace and destination, computes cosine similarity against vec in Go, and returns the top-N results by score descending.
This implementation does not use sqlite-vec (which requires CGO). It provides correct cosine-ranked results using the modernc pure-Go driver.
The over-sample factor is applied to the SQL LIMIT to increase recall before in-process reranking: we fetch min(overSample * limit, totalRows) candidates.
Reference: Tech Spec Section 3.4 — Stage 4.
func (*SQLiteDestination) Write ¶
func (d *SQLiteDestination) Write(p TranslatedPayload) error
Write persists p to the memories table. The operation is idempotent: INSERT OR IGNORE silently discards a write whose payload_id already exists. All values are bound via parameterized placeholders — no string interpolation. If p.Embedding is non-empty it is serialized as a little-endian float32 BLOB and stored in the embedding column for Stage 4 semantic retrieval.
type ScoredRecord ¶
type ScoredRecord struct {
Payload TranslatedPayload
Score float32
}
ScoredRecord pairs a TranslatedPayload with its cosine similarity score from a semantic search. Used by Stage 4 (Semantic Retrieval).
Reference: Tech Spec Section 3.4 — Stage 4.
type SemanticSearcher ¶
type SemanticSearcher interface {
// CanSemanticSearch reports whether this destination has vector index support
// and at least one indexed embedding. Returns false to signal graceful skip.
CanSemanticSearch() bool
// SemanticSearch returns up to params.Limit records nearest to vec, ranked
// by cosine similarity descending. Filters are applied from params
// (Namespace, Destination). Implementations must be safe for concurrent use.
SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
}
SemanticSearcher is implemented by destination backends that support vector similarity search. It is an optional extension of DestinationWriter — callers must type-assert to check for support.
CanSemanticSearch must be checked before calling SemanticSearch. Destinations that have no indexed embeddings (e.g. a fresh SQLite DB with no writes) may return false to skip Stage 4 gracefully.
Reference: Tech Spec Section 3.4 — Stage 4.
type SupabaseDestination ¶
type SupabaseDestination struct {
// contains filtered or unexported fields
}
SupabaseDestination writes and reads TranslatedPayload records via the Supabase REST API. Semantic search uses the match_memories RPC function which must be created in the Supabase database:
CREATE OR REPLACE FUNCTION match_memories(
query_embedding vector(1536),
match_count int,
namespace text DEFAULT NULL,
destination_id text DEFAULT NULL
) RETURNS TABLE ( ... ) ...
The resolved API key is never logged at any level.
Reference: Tech Spec Section 3.4 — Stage 4, Phase 5 Behavioral Contract 3.
func OpenSupabase ¶
func OpenSupabase(baseURL, resolvedKey string, logger *slog.Logger) (*SupabaseDestination, error)
OpenSupabase creates a SupabaseDestination. baseURL is the Supabase project URL (e.g. "https://xyzproject.supabase.co"). resolvedKey is the pre-resolved service-role or anon key — never re-resolved per request.
INVARIANT: resolvedKey is never logged at any level.
func (*SupabaseDestination) CanSemanticSearch ¶
func (d *SupabaseDestination) CanSemanticSearch() bool
CanSemanticSearch reports whether the match_memories RPC is available. We always return true for Supabase — if the function doesn't exist, the RPC call returns an HTTP error which SemanticSearch will propagate.
func (*SupabaseDestination) Close ¶
func (d *SupabaseDestination) Close() error
Close is a no-op for the Supabase destination (http.Client has no close).
func (*SupabaseDestination) Exists ¶
func (d *SupabaseDestination) Exists(payloadID string) (bool, error)
Exists reports whether a record with payloadID exists.
func (*SupabaseDestination) Ping ¶
func (d *SupabaseDestination) Ping() error
Ping checks connectivity by sending a HEAD request to /rest/v1/memories.
func (*SupabaseDestination) Query ¶
func (d *SupabaseDestination) Query(params QueryParams) (QueryResult, error)
Query returns a page of memories using the Supabase REST API filter syntax. All filter values are passed as URL query parameters — no string interpolation in SQL.
func (*SupabaseDestination) SemanticSearch ¶
func (d *SupabaseDestination) SemanticSearch(ctx context.Context, vec []float32, params QueryParams) ([]ScoredRecord, error)
SemanticSearch calls the match_memories PostgreSQL function via the Supabase RPC endpoint. The function must exist in the Supabase database.
Reference: Tech Spec Section 3.4 — Stage 4.
func (*SupabaseDestination) Write ¶
func (d *SupabaseDestination) Write(p TranslatedPayload) error
Write inserts p into the Supabase memories table via POST /rest/v1/memories. Uses Prefer: resolution=ignore-duplicates for idempotent behaviour. INVARIANT: resolvedKey is never logged.
type TimeTravelParams ¶
type TimeTravelParams struct {
AsOf time.Time // return memories with timestamp <= this value
Namespace string
Subject string
Destination string
Limit int
Offset int
}
TimeTravelParams holds options for time-travel queries.
type TimeTravelQuerier ¶
type TimeTravelQuerier interface {
// QueryTimeTravel returns memories as of the specified timestamp.
QueryTimeTravel(params TimeTravelParams) (QueryResult, error)
}
TimeTravelQuerier is an optional interface for destination backends that support time-travel queries. Callers must type-assert to check for support. Reference: Tech Spec Section 13.2.
type TranslatedPayload ¶
type TranslatedPayload struct {
PayloadID string `json:"payload_id"`
RequestID string `json:"request_id"`
Source string `json:"source"`
Subject string `json:"subject"`
Namespace string `json:"namespace"`
Destination string `json:"destination"`
Collection string `json:"collection"`
Content string `json:"content"`
Model string `json:"model"`
Role string `json:"role"`
Timestamp time.Time `json:"timestamp"`
IdempotencyKey string `json:"idempotency_key"`
SchemaVersion int `json:"schema_version"`
TransformVersion string `json:"transform_version"`
ActorType string `json:"actor_type"` // user, agent, or system
ActorID string `json:"actor_id"` // identity of the actor
Embedding []float32 `json:"embedding,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
// Addendum A3: Retrieval Firewall — sensitivity classification fields.
// SensitivityLabels are free-form tags (e.g. "pii", "financial").
// ClassificationTier is a single tier from the configured tier_order
// (default: "public"). Both default to empty/public when absent.
// Reference: Tech Spec Addendum Section A3.2.
SensitivityLabels []string `json:"sensitivity_labels,omitempty"`
ClassificationTier string `json:"classification_tier,omitempty"`
}
TranslatedPayload is the canonical write envelope produced by the field mapping and transform stages of the write path. It is stored in the WAL Payload field as raw JSON and deserialized by the queue worker before handing off to a DestinationWriter.
Reference: Tech Spec Section 7.1.