Documentation
¶
Overview ¶
Package embedding provides vector embedding generation and caching for semantic search in the knowledge graph.
Overview ¶
The embedding package generates dense vector representations of text content, enabling semantic similarity search across entities. It supports multiple embedding strategies: neural embeddings via HTTP APIs (TEI, OpenAI, LocalAI) and pure-Go lexical embeddings using BM25 as a fallback.
Embeddings are cached using content-addressed storage (SHA-256 hashes) to enable deduplication across entities with identical content. An async worker monitors pending embedding requests and processes them in the background.
Architecture ¶
┌─────────────────────────────────────────┐
│ Embedder │
│ (HTTPEmbedder or BM25Embedder) │
└─────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ Worker │
│ Watches EMBEDDING_INDEX KV for status="pending" records │
├──────────────────────────────┬─────────────────────────────────┤
│ Check dedup cache │ Generate new embedding │
│ (content hash lookup) │ (call embedder) │
└──────────────────────────────┴─────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ NATS KV Storage │
├───────────────────────────────┬────────────────────────────────┤
│ EMBEDDING_INDEX │ EMBEDDING_DEDUP │
│ entityID → Record │ contentHash → DedupRecord │
│ (vector, status, metadata) │ (vector, entity IDs) │
└───────────────────────────────┴────────────────────────────────┘
Usage ¶
Configure and use the HTTP embedder with an external service:
embedder, err := embedding.NewHTTPEmbedder(embedding.HTTPConfig{
BaseURL: "http://tei:8082",
Model: "all-MiniLM-L6-v2",
Cache: embedding.NewNATSCache(cacheBucket),
})
// Generate embeddings for batch of texts
vectors, err := embedder.Generate(ctx, []string{
"autonomous drone navigation system",
"ground control station for UAV fleet",
})
Use BM25 embedder as fallback when neural services unavailable:
embedder := embedding.NewBM25Embedder(embedding.BM25Config{
Dimensions: 384,
K1: 1.5, // Term frequency saturation
B: 0.75, // Length normalization
})
Start the async worker to process pending embeddings:
worker := embedding.NewWorker(storage, embedder, indexBucket, logger).
WithWorkers(5).
WithContentStore(objectStore). // For ContentStorable entities
WithOnGenerated(func(entityID string, vector []float32) {
// Update vector index cache
})
worker.Start(ctx)
defer worker.Stop()
Embedders ¶
HTTPEmbedder (HTTPEmbedder):
Calls OpenAI-compatible embedding APIs. Compatible with:
- Hugging Face TEI (Text Embeddings Inference) - recommended for local inference
- OpenAI cloud API
- LocalAI, Ollama, vLLM, and other compatible services
Supports content-addressed caching to avoid redundant API calls.
BM25Embedder (BM25Embedder):
Pure Go lexical embeddings using BM25 (Best Matching 25) algorithm:
- No external dependencies - works offline
- Feature hashing to fixed dimensions
- Stopword removal and simple stemming
- L2 normalization for cosine similarity
Provides reasonable keyword matching but lacks semantic understanding. Use as fallback when neural services unavailable.
Storage ¶
The package uses two NATS KV buckets:
EMBEDDING_INDEX: Primary storage for embedding records
- Key: entity ID
- Value: Record with vector, status, metadata
- Statuses: pending, generated, failed
EMBEDDING_DEDUP: Content-addressed deduplication
- Key: SHA-256 content hash
- Value: DedupRecord with vector and entity ID list
- Enables sharing vectors across entities with identical content
ContentStorable Support ¶
For entities with large text content stored in ObjectStore, the worker can fetch content dynamically using StorageRef:
storage.SavePendingWithStorageRef(ctx, entityID, contentHash,
&embedding.StorageRef{
StorageInstance: "main",
Key: "content/papers/doc123",
},
map[string]string{
message.ContentRoleBody: "full_text",
message.ContentRoleAbstract: "abstract",
message.ContentRoleTitle: "title",
},
)
Vector Operations ¶
The package provides common vector operations:
// Cosine similarity for semantic search similarity := embedding.CosineSimilarity(vectorA, vectorB) // Returns -1 to 1, where 1 = identical, 0 = orthogonal
Configuration ¶
HTTP embedder configuration:
BaseURL: "http://localhost:8082" # TEI endpoint Model: "all-MiniLM-L6-v2" # 384 dimensions, fast APIKey: "" # Optional for local services Timeout: 30s # HTTP timeout
BM25 embedder configuration:
Dimensions: 384 # Match neural models for compatibility K1: 1.5 # Term frequency saturation (1.2-2.0) B: 0.75 # Length normalization (0.0-1.0)
Worker configuration:
Workers: 5 # Concurrent worker goroutines
Thread Safety ¶
HTTPEmbedder, BM25Embedder, Storage, and Worker are safe for concurrent use. The Worker uses goroutines to process pending embeddings in parallel.
Metrics ¶
The worker accepts a WorkerMetrics interface for observability:
- IncDedupHits(): Embedding reused from dedup cache
- IncFailed(): Embedding generation failed
- SetPending(): Current pending embedding count
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph/indexmanager: Uses embeddings for similarity search
- github.com/c360studio/semstreams/graph/inference: Semantic gap detection using similarities
- github.com/c360studio/semstreams/storage/objectstore: Content storage for large texts
Package embedding provides embedding generation and caching for semantic search.
This package contains interfaces and implementations for generating vector embeddings from text, which are used by the indexmanager for semantic similarity search.
Index ¶
- Constants
- func ContentHash(text string) string
- func CosineSimilarity(a, b []float32) float64
- type BM25Config
- type BM25Embedder
- type Cache
- type DedupRecord
- type Embedder
- type GeneratedCallback
- type HTTPConfig
- type HTTPEmbedder
- type NATSCache
- type Record
- type ScoredEntity
- type Status
- type Storage
- func (s *Storage) DeleteEmbedding(ctx context.Context, entityID string) error
- func (s *Storage) FindSimilarFromCache(excludeID string, queryVector []float32, limit int) ([]ScoredEntity, bool)
- func (s *Storage) GetByContentHash(ctx context.Context, contentHash string) (*DedupRecord, error)
- func (s *Storage) GetEmbedding(ctx context.Context, entityID string) (*Record, error)
- func (s *Storage) ListGeneratedEntityIDs(ctx context.Context) ([]string, error)
- func (s *Storage) SaveDedup(ctx context.Context, contentHash string, vector []float32, entityID string) error
- func (s *Storage) SaveFailed(ctx context.Context, entityID, errorMsg string) error
- func (s *Storage) SaveGenerated(ctx context.Context, entityID string, vector []float32, model string, ...) error
- func (s *Storage) SavePending(ctx context.Context, entityID, contentHash, sourceText string) error
- func (s *Storage) SavePendingWithStorageRef(ctx context.Context, entityID, contentHash string, storageRef *StorageRef, ...) error
- func (s *Storage) StartVectorCache(ctx context.Context) error
- type StorageRef
- type Worker
- func (w *Worker) Start(ctx context.Context) error
- func (w *Worker) Stop() error
- func (w *Worker) WithContentStore(store *objectstore.Store) *Worker
- func (w *Worker) WithMetrics(m WorkerMetrics) *Worker
- func (w *Worker) WithOnGenerated(cb GeneratedCallback) *Worker
- func (w *Worker) WithWorkers(n int) *Worker
- type WorkerMetrics
Constants ¶
const ( // EmbeddingIndexBucket stores entity embeddings with metadata EmbeddingIndexBucket = "EMBEDDING_INDEX" // EmbeddingDedupBucket stores content-addressed embeddings for deduplication EmbeddingDedupBucket = "EMBEDDING_DEDUP" )
Variables ¶
This section is empty.
Functions ¶
func ContentHash ¶
ContentHash generates a SHA-256 hash of text content for use as a cache key.
This function provides consistent hashing across the codebase for content-addressed storage.
func CosineSimilarity ¶
CosineSimilarity computes the cosine similarity between two vectors.
Returns a value between -1 and 1, where:
- 1 means vectors are identical
- 0 means vectors are orthogonal (unrelated)
- -1 means vectors are opposite
Formula: cos(θ) = (A · B) / (||A|| × ||B||)
Types ¶
type BM25Config ¶
type BM25Config struct {
// Dimensions is the output embedding dimension (default: 384 for compatibility)
Dimensions int
// K1 controls term frequency saturation (default: 1.5)
// Higher values give more weight to term frequency
K1 float64
// B controls length normalization (default: 0.75)
// B=1.0 means full normalization, B=0.0 means no normalization
B float64
}
BM25Config configures the BM25 embedder.
type BM25Embedder ¶
type BM25Embedder struct {
// contains filtered or unexported fields
}
BM25Embedder implements pure Go lexical embeddings using BM25 algorithm.
This embedder provides a fallback when neural embedding services are unavailable. It uses BM25 (Best Matching 25) scoring - a term-frequency based ranking function widely used in information retrieval.
The embedder generates fixed-dimension vectors by:
- Tokenizing text (lowercase, split on non-alphanumeric)
- Computing term frequencies
- Hashing terms to fixed dimensions (feature hashing)
- Applying BM25 weighting (TF with IDF and length normalization)
- L2 normalizing for cosine similarity compatibility
Parameters:
- k1: Controls term frequency saturation (default 1.5)
- b: Controls document length normalization (default 0.75)
This is a lexical approach - it won't understand semantic similarity like neural models, but provides reasonable results for exact term matches and common phrases.
func NewBM25Embedder ¶
func NewBM25Embedder(cfg BM25Config) *BM25Embedder
NewBM25Embedder creates a new BM25-based embedder.
func (*BM25Embedder) Close ¶
func (b *BM25Embedder) Close() error
Close releases resources (no-op for BM25).
func (*BM25Embedder) Dimensions ¶
func (b *BM25Embedder) Dimensions() int
Dimensions returns the dimensionality of embeddings.
func (*BM25Embedder) Generate ¶
Generate creates BM25-based embeddings for the given texts.
This updates internal document statistics incrementally, so the embedder "learns" vocabulary and IDF scores from all texts it processes.
func (*BM25Embedder) Model ¶
func (b *BM25Embedder) Model() string
Model returns the model identifier.
type Cache ¶
type Cache interface {
// Get retrieves a cached embedding for the given content hash.
//
// Returns an error if the embedding is not found in the cache.
Get(ctx context.Context, contentHash string) ([]float32, error)
// Put stores an embedding in the cache with the given content hash.
//
// The cache should be content-addressed using a cryptographic hash
// (e.g., SHA-256) of the text content.
Put(ctx context.Context, contentHash string, embedding []float32) error
}
Cache provides content-addressed caching for embeddings.
Implementations should use a hash of the text content as the key to enable deduplication and fast lookups.
type DedupRecord ¶
type DedupRecord struct {
Vector []float32 `json:"vector"`
EntityIDs []string `json:"entity_ids"` // Entities sharing this content
FirstGenerated time.Time `json:"first_generated"`
}
DedupRecord stores content-addressed embeddings for deduplication
type Embedder ¶
type Embedder interface {
// Generate creates embeddings for the given texts.
//
// This is the primary method - batch operations are natural for all providers.
// For single text, pass a slice with one element.
// Returns a slice of float32 slices, where each inner slice is an embedding vector.
Generate(ctx context.Context, texts []string) ([][]float32, error)
// Dimensions returns the dimensionality of embeddings produced by this embedder.
//
// For example, all-MiniLM-L6-v2 produces 384-dimensional vectors.
Dimensions() int
// Model returns the model identifier used by this embedder.
//
// This is useful for debugging and logging which model is being used.
Model() string
// Close releases any resources held by the embedder.
//
// Must be called when the embedder is no longer needed. For HTTP providers
// this is typically a no-op, but for local ONNX models this releases GPU/CPU resources.
Close() error
}
Embedder generates vector embeddings for text.
Implementations can use different providers (HTTP APIs, BM25, etc.) while maintaining a consistent interface. All providers support batch operations natively, following OpenAI API patterns.
type GeneratedCallback ¶
GeneratedCallback is called when an embedding is successfully generated. The callback receives the entity ID and the generated embedding vector.
type HTTPConfig ¶
type HTTPConfig struct {
// BaseURL is the base URL of the embedding service.
// Examples:
// - "http://localhost:8082" (TEI - Hugging Face Text Embeddings Inference)
// - "http://tei:8082" (TEI container by name)
// - "http://localhost:8080" (LocalAI)
// - "https://api.openai.com/v1" (OpenAI cloud)
BaseURL string
// Model is the embedding model to use.
// Examples:
// - "all-MiniLM-L6-v2" (TEI default - 384 dims, fast)
// - "all-mpnet-base-v2" (TEI - 768 dims, higher quality)
// - "text-embedding-ada-002" (OpenAI)
Model string
// APIKey for authentication (optional for local services).
// Required for OpenAI, optional for TEI/LocalAI.
APIKey string
// Timeout for HTTP requests (default: 30s).
Timeout time.Duration
// Cache for embedding results (optional but recommended).
Cache Cache
// Logger for error logging (optional, defaults to slog.Default()).
Logger *slog.Logger
}
HTTPConfig configures the HTTP embedder.
type HTTPEmbedder ¶
type HTTPEmbedder struct {
// contains filtered or unexported fields
}
HTTPEmbedder calls an external OpenAI-compatible embedding service via HTTP.
This implementation works with:
- Hugging Face TEI (Text Embeddings Inference) - recommended, containerized
- LocalAI (self-hosted)
- OpenAI (cloud)
- Any OpenAI-compatible embedding API
Uses the standard OpenAI SDK for consistency and compatibility. See Dockerfile.tei and docker-compose.services.yml for ready-to-use TEI setup.
func NewHTTPEmbedder ¶
func NewHTTPEmbedder(cfg HTTPConfig) (*HTTPEmbedder, error)
NewHTTPEmbedder creates a new HTTP-based embedder.
func (*HTTPEmbedder) Close ¶
func (h *HTTPEmbedder) Close() error
Close releases resources (no-op for HTTP client).
func (*HTTPEmbedder) Dimensions ¶
func (h *HTTPEmbedder) Dimensions() int
Dimensions returns the dimensionality of embeddings produced.
func (*HTTPEmbedder) Generate ¶
Generate creates embeddings by calling the external HTTP service.
This method checks the cache first (if configured), then calls the embedding API for any cache misses.
func (*HTTPEmbedder) Model ¶
func (h *HTTPEmbedder) Model() string
Model returns the model identifier.
type NATSCache ¶
type NATSCache struct {
// contains filtered or unexported fields
}
NATSCache implements Cache using NATS KV for storage.
Embeddings are stored with content-addressed keys (SHA-256 hash of text) to enable deduplication and fast lookups.
func NewNATSCache ¶
NewNATSCache creates a new NATS KV-backed embedding cache.
type Record ¶
type Record struct {
EntityID string `json:"entity_id"`
Vector []float32 `json:"vector,omitempty"`
ContentHash string `json:"content_hash"`
SourceText string `json:"source_text,omitempty"` // Stored for pending records (legacy)
Model string `json:"model,omitempty"`
Dimensions int `json:"dimensions,omitempty"`
GeneratedAt time.Time `json:"generated_at,omitempty"`
Status Status `json:"status"`
ErrorMsg string `json:"error_msg,omitempty"` // If status=failed
// ContentStorable support (Feature 008)
// When StorageRef is set, Worker fetches content from ObjectStore
// and uses ContentFields to extract text for embedding.
StorageRef *StorageRef `json:"storage_ref,omitempty"`
ContentFields map[string]string `json:"content_fields,omitempty"` // Role → field name
}
Record represents a stored embedding with metadata
type ScoredEntity ¶
ScoredEntity pairs an entity ID with its cosine similarity score. Returned by FindSimilarFromCache for zero-KV similarity queries.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage handles persistence of embeddings to NATS KV buckets. It also maintains an in-memory vector cache, kept current via a KV watcher on the index bucket, to serve similarity queries without any network round-trips.
func NewStorage ¶
NewStorage creates a new embedding storage instance
func (*Storage) DeleteEmbedding ¶
DeleteEmbedding removes an embedding record
func (*Storage) FindSimilarFromCache ¶
func (s *Storage) FindSimilarFromCache(excludeID string, queryVector []float32, limit int) ([]ScoredEntity, bool)
FindSimilarFromCache scans the in-memory vector cache for entities whose cosine similarity to queryVector is highest, excluding the entity identified by excludeID (pass "" to skip exclusion).
The second return value reports whether the cache was ready (warm) at the time of the call. Callers must fall back to KV when it is false.
func (*Storage) GetByContentHash ¶
GetByContentHash retrieves an embedding by content hash (for deduplication)
func (*Storage) GetEmbedding ¶
GetEmbedding retrieves an embedding by entity ID
func (*Storage) ListGeneratedEntityIDs ¶
ListGeneratedEntityIDs returns all entity IDs that have embeddings in storage. This is used for pre-warming the vector cache on startup.
func (*Storage) SaveDedup ¶
func (s *Storage) SaveDedup(ctx context.Context, contentHash string, vector []float32, entityID string) error
SaveDedup saves a content-addressed embedding for deduplication
func (*Storage) SaveFailed ¶
SaveFailed marks an embedding as failed with error message
func (*Storage) SaveGenerated ¶
func (s *Storage) SaveGenerated(ctx context.Context, entityID string, vector []float32, model string, dimensions int) error
SaveGenerated saves a generated embedding with metadata
func (*Storage) SavePending ¶
SavePending saves a pending embedding request with source text (legacy mode).
func (*Storage) SavePendingWithStorageRef ¶
func (s *Storage) SavePendingWithStorageRef( ctx context.Context, entityID, contentHash string, storageRef *StorageRef, contentFields map[string]string, ) error
SavePendingWithStorageRef saves a pending embedding request with storage reference. This enables the ContentStorable pattern where text is fetched from ObjectStore. The contentHash is still used for deduplication if provided.
func (*Storage) StartVectorCache ¶
StartVectorCache launches a goroutine that keeps the in-memory vector cache synchronised with the EMBEDDING_INDEX KV bucket via WatchAll.
The goroutine runs until ctx is cancelled. It is safe to call only once; a second call is a no-op. cacheReady is closed after the initial snapshot has been applied (nil delimiter received), so FindSimilarFromCache will not return results until the cache is warm.
type StorageRef ¶
StorageRef is a simplified reference for embedding storage. Mirrors message.StorageReference structure.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes pending embedding requests asynchronously
func NewWorker ¶
func NewWorker( storage *Storage, embedder Embedder, indexBucket jetstream.KeyValue, logger *slog.Logger, ) *Worker
NewWorker creates a new async embedding worker
func (*Worker) WithContentStore ¶
func (w *Worker) WithContentStore(store *objectstore.Store) *Worker
WithContentStore sets the ObjectStore for ContentStorable support. When set, the worker can fetch content from ObjectStore for records that have StorageRef instead of SourceText.
func (*Worker) WithMetrics ¶
func (w *Worker) WithMetrics(m WorkerMetrics) *Worker
WithMetrics sets the metrics reporter for observability.
func (*Worker) WithOnGenerated ¶
func (w *Worker) WithOnGenerated(cb GeneratedCallback) *Worker
WithOnGenerated sets a callback that is invoked when an embedding is generated. Use this to populate caches or trigger downstream processing.
func (*Worker) WithWorkers ¶
WithWorkers sets the number of concurrent workers
type WorkerMetrics ¶
type WorkerMetrics interface {
// IncDedupHits increments the deduplication hits counter
IncDedupHits()
// IncFailed increments the failed embeddings counter
IncFailed()
// SetPending sets the current pending embeddings gauge
SetPending(count float64)
}
WorkerMetrics provides metrics callbacks for embedding worker operations. This allows the worker to report metrics without direct dependency on prometheus.