memoryindex

package
v1.74.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package memoryindex is the memory consumer of the shared indexjobs framework (#507). It registers a Source/Sink pair under source_kind = "memory" so the reconciler backfills embeddings that the synchronous write path could not produce: a memory saved while the embedder was down (embedding NULL) or left stale by a provider model swap (embedding_model differs from the current model). Backfill runs off the request path; the interactive write-then-recall flow keeps its synchronous embed, so read-your-writes is preserved.

Unlike the api-catalog and tools consumers, memory stores its vectors inline on the memory_records table (one embedding per record), not in a dedicated vector table. So this package's Sink reads and writes the embedding / embedding_model / embedding_text_hash columns of memory_records directly: a record IS its own indexing unit. SourceID is the record id; each unit yields exactly one Item.

Index

Constants

View Source
const SourceKind = "memory"

SourceKind is the indexjobs source_kind this package serves.

Variables

This section is empty.

Functions

This section is empty.

Types

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink implements indexjobs.Sink for the memory kind over the embedding columns of memory_records. currentModel is the provider model the gap query diffs stored rows against, so a model swap re-embeds rows stamped with the previous model.

func NewSink

func NewSink(store *Store, currentModel string) *Sink

NewSink returns a Sink backed by the given store. currentModel is the embedding provider's model identifier (embedding.ModelName); pass "" on a deployment whose provider does not name its model; every row then matches "" and only NULL-embedding rows are treated as gaps.

func (*Sink) Coverage

func (s *Sink) Coverage(ctx context.Context) (indexjobs.Coverage, error)

Coverage reports the memory kind's indexed-vs-expected record totals (records with an embedding vs all active records). ExpectedKnown is true: every active record is expected to converge to one vector.

func (*Sink) FindGaps

func (s *Sink) FindGaps(ctx context.Context) ([]string, error)

FindGaps returns active record ids whose embedding is missing or was produced by a model other than the current one.

func (*Sink) Kind

func (*Sink) Kind() string

Kind reports the memory source kind.

func (*Sink) ListExisting

func (s *Sink) ListExisting(ctx context.Context, key indexjobs.Key) (map[string]indexjobs.Vector, error)

ListExisting returns the record's persisted vector keyed by item id for the worker's dedup pass.

func (*Sink) StampExpected

func (*Sink) StampExpected(context.Context, indexjobs.Key, int) error

StampExpected is a no-op for memory. Gap detection is condition-based (embedding IS NULL OR model mismatch), not count-based, so there is no expected count to record per unit.

func (*Sink) Upsert

func (s *Sink) Upsert(ctx context.Context, key indexjobs.Key, rows []indexjobs.Vector) error

Upsert writes the record's vector. The memory unit holds one item and has no sibling rows, so there is nothing to delete; it delegates to the shared store write.

func (*Sink) UpsertBatch

func (s *Sink) UpsertBatch(ctx context.Context, key indexjobs.Key, rows []indexjobs.Vector) error

UpsertBatch is identical to Upsert for memory (single-item unit, no rows outside the batch to preserve).

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source implements indexjobs.Source for the memory kind. A unit is one memory record (SourceID = record id) and yields exactly one item: the record's content. The worker embeds it and the Sink writes the vector back onto the same row.

func NewSource

func NewSource(store *Store) *Source

NewSource returns a Source backed by the given store.

func (*Source) Kind

func (*Source) Kind() string

Kind reports the memory source kind.

func (*Source) LoadItems

func (s *Source) LoadItems(ctx context.Context, sourceID string) ([]indexjobs.Item, error)

LoadItems returns the record's single embeddable item. A record that was archived or deleted between enqueue and claim yields an empty slice (a clean completion that writes no vector), per the Source contract.

func (*Source) OnSucceeded

func (*Source) OnSucceeded(string)

OnSucceeded is a no-op: recall reads embeddings from memory_records directly on every query, so there is no in-memory cache to refresh after a backfill writes a vector.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store reads and writes memory embedding state on the memory_records table for the indexjobs memory consumer. It is intentionally separate from the memory.Store interface: it touches only the embedding columns (embedding, embedding_model, embedding_text_hash) and is scoped to the backfill path, so it does not widen the request-path store contract.

func NewStore

func NewStore(db *sql.DB) *Store

NewStore returns a Store over the given database.

func (*Store) Coverage

func (s *Store) Coverage(ctx context.Context) (indexed, expected int, err error)

Coverage returns the number of active records with an embedding (indexed) and the total number of active records (expected). The memory kind reports a real indexed/expected ratio because every active record is expected to carry a vector once converged.

func (*Store) FindGaps

func (s *Store) FindGaps(ctx context.Context, currentModel string) ([]string, error)

FindGaps returns the ids of active records whose embedding is missing or was produced by a model other than the current provider's. Missing embeddings cover the embedder-outage case (a memory saved while the provider was down); the model mismatch covers a provider model swap. Both converge off the request path when the reconciler enqueues them.

func (*Store) GetContent

func (s *Store) GetContent(ctx context.Context, id string) (string, error)

GetContent returns the active record's content. A record that is archived or absent yields errArchivedOrMissing so the Source returns an empty item set (a clean "nothing to index" completion).

func (*Store) ListVectors

func (s *Store) ListVectors(ctx context.Context, id string) (map[string]indexjobs.Vector, error)

ListVectors returns the record's persisted embedding keyed by item id (the record id), for the worker's text-hash + model dedup pass. A record with no embedding yields an empty map, so the worker embeds it.

func (*Store) UpsertVectors

func (s *Store) UpsertVectors(ctx context.Context, id string, rows []indexjobs.Vector) error

UpsertVectors writes the embedding back onto the record. The memory unit holds exactly one item (the record itself); a missing or empty row set is a no-op. The id predicate plus the single-row contract make Upsert and UpsertBatch identical for memory (there are no sibling rows to delete), so both delegate here.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL