graphembedding

package
v1.0.0-alpha.61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 22 Imported by: 0

README

graph-embedding

Vector embedding generation component for the graph subsystem.

Overview

The graph-embedding component watches the ENTITY_STATES KV bucket and generates vector embeddings for entities. These embeddings enable semantic similarity search and are used by the clustering component for community detection.

Architecture

                    ┌──────────────────┐
ENTITY_STATES ─────►│                  │
   (KV watch)       │  graph-embedding ├──► EMBEDDINGS_CACHE (KV)
                    │                  │
                    └────────┬─────────┘
                             │
                             ▼
                    ┌──────────────────┐
                    │  Embedding API   │
                    │  (HTTP/BM25)     │
                    └──────────────────┘

Features

  • Multiple Embedder Types: HTTP API (OpenAI-compatible) or BM25 sparse vectors
  • Batch Processing: Efficient bulk embedding generation
  • Configurable Text Extraction: Extract text from multiple entity fields
  • Caching: Embeddings cached with configurable TTL

Configuration

{
  "type": "processor",
  "name": "graph-embedding",
  "enabled": true,
  "config": {
    "ports": {
      "inputs": [
        {
          "name": "entity_watch",
          "subject": "ENTITY_STATES",
          "type": "kv-watch"
        }
      ],
      "outputs": [
        {
          "name": "embeddings",
          "subject": "EMBEDDINGS_CACHE",
          "type": "kv"
        }
      ]
    },
    "embedder_type": "http",
    "batch_size": 50,
    "cache_ttl": "1h"
  }
}
Configuration Options
Option Type Default Description
ports object required Port configuration for inputs and outputs
embedder_type string "bm25" Embedder type: "http" or "bm25". HTTP requires model registry with embedding capability
batch_size int 50 Batch size for embedding requests
cache_ttl duration "1h" Cache TTL for embeddings

Ports

Inputs
Name Type Subject Description
entity_watch kv-watch ENTITY_STATES Watch entity state changes
Outputs
Name Type Subject Description
embeddings kv EMBEDDINGS_CACHE Entity embeddings storage

Embedder Types

HTTP Embedder

Uses an OpenAI-compatible embedding API:

{
  "embedder_type": "http"
}

The HTTP embedder URL, model, and API key are resolved from the model registry's embedding capability.

Compatible with:

  • OpenAI Embeddings API
  • Local embedding servers (llama.cpp, text-embeddings-inference)
  • Any OpenAI-compatible embedding service
BM25 Embedder

Uses BM25 sparse vectors for lightweight deployments:

{
  "embedder_type": "bm25"
}

No external service required. Suitable for:

  • Development environments
  • Offline deployments
  • Resource-constrained environments

Embedding Storage

Embeddings are stored in EMBEDDINGS_CACHE with entity ID as key:

{
  "entity_id": "c360.logistics.warehouse.sensor.temperature.temp-001",
  "vector": [0.123, -0.456, 0.789, ...],
  "model": "BAAI/bge-small-en-v1.5",
  "created_at": "2024-01-15T10:30:00Z",
  "text_hash": "abc123..."
}

Dependencies

Upstream
  • graph-ingest - produces ENTITY_STATES that this component watches
Downstream
  • graph-clustering - reads embeddings for semantic similarity
  • graph-gateway - reads embeddings for semantic search
External
  • Embedding API service (if using HTTP embedder)

Metrics

Metric Type Description
graph_embedding_generated_total counter Total embeddings generated
graph_embedding_cache_hits_total counter Cache hits (unchanged entities)
graph_embedding_api_latency_seconds histogram Embedding API latency
graph_embedding_errors_total counter Total embedding errors

Health

The component reports healthy when:

  • KV watch subscription is active
  • Embedding API is reachable (if using HTTP embedder)
  • Error rate is below threshold

Documentation

Overview

Package graphembedding provides the graph-embedding component for generating entity embeddings.

Package graphembedding provides the graph-embedding component for generating entity embeddings.

Overview

The graph-embedding component watches the ENTITY_STATES KV bucket and generates vector embeddings for entities, storing them in the EMBEDDINGS_CACHE KV bucket. These embeddings enable semantic similarity search and clustering.

Tier

Tier: STATISTICAL (Tier 1) with BM25, SEMANTIC (Tier 2) with HTTP embeddings. Not used in Structural (Tier 0) deployments.

Architecture

graph-embedding is a Tier 1+ component. It is not used in Structural-only deployments but required for semantic search and community detection features.

                    ┌──────────────────┐
ENTITY_STATES ─────►│                  │
   (KV watch)       │  graph-embedding ├──► EMBEDDINGS_CACHE (KV)
                    │                  │
                    └────────┬─────────┘
                             │
                             ▼
                    ┌──────────────────┐
                    │  Embedding API   │
                    │  (HTTP/BM25)     │
                    └──────────────────┘

Features

  • Entity text extraction from configurable fields
  • HTTP embedding API integration (OpenAI-compatible)
  • BM25 fallback for offline/lightweight deployments
  • Batch processing for efficiency
  • Caching with configurable TTL

Configuration

The component is configured via JSON with the following structure:

{
  "ports": {
    "inputs": [
      {"name": "entity_watch", "subject": "ENTITY_STATES", "type": "kv-watch"}
    ],
    "outputs": [
      {"name": "embeddings", "subject": "EMBEDDINGS_CACHE", "type": "kv"}
    ]
  },
  "embedder_type": "http",
  "batch_size": 50,
  "cache_ttl": "1h"
}

Port Definitions

Inputs:

  • KV watch: ENTITY_STATES - watches for entity state changes

Outputs:

  • KV bucket: EMBEDDINGS_CACHE - stores vector embeddings keyed by entity ID

Embedder Types

  • http: Uses HTTP API (OpenAI-compatible) for embedding generation
  • bm25: Uses BM25 sparse vectors for lightweight deployments

Usage

Register the component with the component registry:

import graphembedding "github.com/c360studio/semstreams/processor/graph-embedding"

func init() {
    graphembedding.Register(registry)
}

Dependencies

Upstream:

  • graph-ingest: produces ENTITY_STATES that this component watches

Downstream:

  • graph-clustering: reads EMBEDDINGS_CACHE for semantic similarity in community detection
  • graph-gateway: reads EMBEDDINGS_CACHE for semantic search queries

Package graphembedding provides Prometheus metrics for graph-embedding component.

Package graphembedding query handlers

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateGraphEmbedding

func CreateGraphEmbedding(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateGraphEmbedding is the factory function for creating graph-embedding components

func Register

func Register(registry *component.Registry) error

Register registers the graph-embedding factory with the component registry

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the graph-embedding processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize validates configuration and sets up ports (no I/O)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing (must be initialized first)

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully shuts down the component

type Config

type Config struct {
	Ports        *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	EmbedderType string                `` /* 153-byte string literal not displayed */
	BatchSize    int                   `json:"batch_size" schema:"type:int,description:Batch size for embedding generation,category:advanced"`
	CacheTTLStr  string                `json:"cache_ttl" schema:"type:string,description:Cache TTL for embeddings (e.g. 15m or 1h),category:advanced"`

	// TextSuffixes controls which triple predicates are extracted for embedding.
	// Predicates ending with any of these suffixes will have their text values embedded.
	// When empty, defaults to: .title, .content, .description, .summary, .text, .name, .body, .abstract, .subject
	TextSuffixes []string `` /* 191-byte string literal not displayed */

	// Dependency startup configuration
	StartupAttempts int `` /* 130-byte string literal not displayed */
	StartupInterval int `` /* 134-byte string literal not displayed */
	CoalesceMs      int `` /* 141-byte string literal not displayed */
	// contains filtered or unexported fields
}

Config holds configuration for graph-embedding component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a valid default configuration

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults sets default values for configuration

func (*Config) CacheTTL

func (c *Config) CacheTTL() time.Duration

CacheTTL returns the parsed cache TTL duration

func (*Config) Validate

func (c *Config) Validate() error

Validate implements component.Validatable interface

type SearchRequest

type SearchRequest struct {
	Query string `json:"query"`
	Limit int    `json:"limit"`
}

SearchRequest is the request format for text search queries

type SearchResponse

type SearchResponse struct {
	Query    string         `json:"query"`
	Results  []SearchResult `json:"results"`
	Duration string         `json:"duration"`
}

SearchResponse is the response format for text search queries

type SearchResult

type SearchResult struct {
	EntityID   string  `json:"entity_id"`
	Similarity float64 `json:"similarity"`
}

SearchResult represents a search result with relevance score

type SimilarEntity

type SimilarEntity struct {
	EntityID   string  `json:"entity_id"`
	Similarity float64 `json:"similarity"`
}

SimilarEntity represents an entity with similarity score

type SimilarRequest

type SimilarRequest struct {
	EntityID string `json:"entity_id"`
	Limit    int    `json:"limit"`
}

SimilarRequest is the request format for similar entity queries

type SimilarResponse

type SimilarResponse struct {
	EntityID string          `json:"entity_id"`
	Similar  []SimilarEntity `json:"similar"`
	Duration string          `json:"duration"`
}

SimilarResponse is the response format for similar entity queries

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL