graphindex

package
v1.0.0-alpha.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 20 Imported by: 0

README

graph-index

Core relationship indexing component for the graph subsystem.

Overview

The graph-index component watches the ENTITY_STATES KV bucket and maintains four relationship indexes that enable efficient graph traversal and querying.

Architecture

                    ┌─────────────────┐
                    │                 ├──► OUTGOING_INDEX (KV)
ENTITY_STATES ─────►│   graph-index   ├──► INCOMING_INDEX (KV)
   (KV watch)       │                 ├──► ALIAS_INDEX (KV)
                    │                 ├──► PREDICATE_INDEX (KV)
                    └─────────────────┘

Indexes

Index Key Value Purpose
OUTGOING_INDEX entity_id relationships[] Find what an entity points to
INCOMING_INDEX entity_id relationships[] Find what points to an entity
ALIAS_INDEX alias_string entity_id Resolve aliases to entity IDs
PREDICATE_INDEX predicate entity_ids[] Find entities by relationship type

Configuration

{
  "type": "processor",
  "name": "graph-index",
  "enabled": true,
  "config": {
    "ports": {
      "inputs": [
        {
          "name": "entity_watch",
          "subject": "ENTITY_STATES",
          "type": "kv-watch"
        }
      ],
      "outputs": [
        {"name": "outgoing_index", "subject": "OUTGOING_INDEX", "type": "kv"},
        {"name": "incoming_index", "subject": "INCOMING_INDEX", "type": "kv"},
        {"name": "alias_index", "subject": "ALIAS_INDEX", "type": "kv"},
        {"name": "predicate_index", "subject": "PREDICATE_INDEX", "type": "kv"}
      ]
    },
    "workers": 4,
    "batch_size": 50
  }
}
Configuration Options
Option Type Default Description
ports object required Port configuration for inputs and outputs
workers int 4 Number of worker goroutines for index updates
batch_size int 50 Batch size for index operations

Ports

Inputs
Name Type Subject Description
entity_watch kv-watch ENTITY_STATES Watch entity state changes
Outputs
Name Type Subject Description
outgoing_index kv OUTGOING_INDEX Outgoing relationship index
incoming_index kv INCOMING_INDEX Incoming relationship index
alias_index kv ALIAS_INDEX Entity alias lookup
predicate_index kv PREDICATE_INDEX Predicate-based index

Index Structure

OUTGOING_INDEX

Maps entity ID to its outgoing relationships:

{
  "entity_id": "c360.logistics.warehouse.sensor.temperature.temp-001",
  "relationships": [
    {"predicate": "located_in", "object": "c360.logistics.warehouse.zone.cold-storage"},
    {"predicate": "type", "object": "temperature_sensor"}
  ]
}
INCOMING_INDEX

Maps entity ID to its incoming relationships:

{
  "entity_id": "c360.logistics.warehouse.zone.cold-storage",
  "relationships": [
    {"predicate": "located_in", "subject": "c360.logistics.warehouse.sensor.temperature.temp-001"},
    {"predicate": "located_in", "subject": "c360.logistics.warehouse.sensor.humidity.hum-001"}
  ]
}
ALIAS_INDEX

Maps alias strings to entity IDs:

{
  "temp-001": "c360.logistics.warehouse.sensor.temperature.temp-001",
  "cold-storage-temp": "c360.logistics.warehouse.sensor.temperature.temp-001"
}
PREDICATE_INDEX

Maps predicates to entity IDs that have that predicate:

{
  "located_in": ["c360.logistics...temp-001", "c360.logistics...hum-001"],
  "alert.active": ["c360.logistics...temp-001"]
}

Dependencies

Upstream
  • graph-ingest - produces ENTITY_STATES that this component watches
Downstream
  • graph-structural - watches OUTGOING_INDEX and INCOMING_INDEX
  • graph-gateway - reads indexes for query resolution

Metrics

Metric Type Description
graph_index_updates_total counter Total index updates processed
graph_index_latency_seconds histogram Index update latency
graph_index_batch_size histogram Batch sizes processed
graph_index_errors_total counter Total indexing errors

Health

The component reports healthy when:

  • KV watch subscription is active
  • All output KV buckets are accessible
  • Index update latency is within acceptable bounds

Documentation

Overview

Package graphindex provides the graph-index component for maintaining graph relationship indexes.

Package graphindex provides the graph-index component for maintaining graph relationship indexes.

Overview

The graph-index component watches the ENTITY_STATES KV bucket and maintains relationship indexes that enable efficient graph traversal and querying.

Tier

Tier: ALL TIERS (Tier 0, 1, 2) - Required for all deployments.

Architecture

graph-index is a core component required for all deployment tiers (Structural, Statistical, Semantic). It watches entity state changes and updates four index buckets in parallel.

                    ┌─────────────────┐
                    │                 ├──► OUTGOING_INDEX (KV)
ENTITY_STATES ─────►│   graph-index   ├──► INCOMING_INDEX (KV)
   (KV watch)       │                 ├──► ALIAS_INDEX (KV)
                    │                 ├──► PREDICATE_INDEX (KV)
                    └─────────────────┘

Indexes

The component maintains four relationship indexes:

  • OUTGOING_INDEX: Maps entity ID → outgoing relationships (subject → predicate → object)
  • INCOMING_INDEX: Maps entity ID → incoming relationships (object ← predicate ← subject)
  • ALIAS_INDEX: Maps alias strings → entity IDs for fast lookup
  • PREDICATE_INDEX: Maps predicate → entity IDs for predicate-based queries

Configuration

The component is configured via JSON with the following structure:

{
  "ports": {
    "inputs": [
      {"name": "entity_watch", "subject": "ENTITY_STATES", "type": "kv-watch"}
    ],
    "outputs": [
      {"name": "outgoing_index", "subject": "OUTGOING_INDEX", "type": "kv"},
      {"name": "incoming_index", "subject": "INCOMING_INDEX", "type": "kv"},
      {"name": "alias_index", "subject": "ALIAS_INDEX", "type": "kv"},
      {"name": "predicate_index", "subject": "PREDICATE_INDEX", "type": "kv"}
    ]
  },
  "workers": 4,
  "batch_size": 50
}

Port Definitions

Inputs:

  • KV watch: ENTITY_STATES - watches for entity state changes

Outputs:

  • KV bucket: OUTGOING_INDEX - outgoing relationship index
  • KV bucket: INCOMING_INDEX - incoming relationship index
  • KV bucket: ALIAS_INDEX - entity alias lookup index
  • KV bucket: PREDICATE_INDEX - predicate-based index

Usage

Register the component with the component registry:

import graphindex "github.com/c360studio/semstreams/processor/graph-index"

func init() {
    graphindex.Register(registry)
}

Dependencies

Upstream:

  • graph-ingest: produces ENTITY_STATES that this component watches

Downstream:

  • graph-clustering: reads OUTGOING_INDEX and INCOMING_INDEX for structural analysis
  • graph-gateway: reads indexes for query resolution

Package graphindex provides Prometheus metrics for graph-index component.

Package graphindex query handlers

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateGraphIndex

func CreateGraphIndex(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateGraphIndex is the factory function for creating graph-index components

func Register

func Register(registry *component.Registry) error

Register registers the graph-index factory with the component registry

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the graph-index processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) DeleteFromAliasIndex

func (c *Component) DeleteFromAliasIndex(ctx context.Context, alias string) error

DeleteFromAliasIndex deletes an alias from the alias index

func (*Component) DeleteFromIncomingIndex

func (c *Component) DeleteFromIncomingIndex(ctx context.Context, targetID, sourceID string) error

DeleteFromIncomingIndex deletes a specific incoming reference

func (*Component) DeleteFromIndexes

func (c *Component) DeleteFromIndexes(ctx context.Context, entityID string) error

DeleteFromIndexes deletes an entity from all indexes

func (*Component) DeleteFromPredicateIndex

func (c *Component) DeleteFromPredicateIndex(ctx context.Context, entityID, predicate string) error

DeleteFromPredicateIndex deletes an entity from the predicate index

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize validates configuration and sets up ports (no I/O)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions. Reads directly from config so ports are available before Initialize().

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions. Reads directly from config so ports are available before Initialize().

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing (must be initialized first)

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully shuts down the component

func (*Component) UpdateAliasIndex

func (c *Component) UpdateAliasIndex(ctx context.Context, alias, entityID string) error

UpdateAliasIndex updates the alias index for an entity

func (*Component) UpdateContextIndex

func (c *Component) UpdateContextIndex(ctx context.Context, entityID string, triples []message.Triple) error

UpdateContextIndex updates the context index for triples with a context value. This enables provenance queries like "all triples from hierarchy inference". The operation is idempotent - replaying the same update has no effect.

func (*Component) UpdateIncomingIndex

func (c *Component) UpdateIncomingIndex(ctx context.Context, targetID, sourceID, predicate string) error

UpdateIncomingIndex updates the incoming index for a relationship

func (*Component) UpdateOutgoingIndex

func (c *Component) UpdateOutgoingIndex(ctx context.Context, entityID, targetID, predicate string) error

UpdateOutgoingIndex updates the outgoing index for an entity relationship

func (*Component) UpdatePredicateIndex

func (c *Component) UpdatePredicateIndex(ctx context.Context, entityID, predicate string) error

UpdatePredicateIndex updates the predicate index for an entity

type Config

type Config struct {
	Ports     *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	Workers   int                   `json:"workers" schema:"type:int,description:Number of worker goroutines,category:advanced"`
	BatchSize int                   `json:"batch_size" schema:"type:int,description:Batch size for index updates,category:advanced"`

	// Dependency startup configuration
	StartupAttempts int `` /* 130-byte string literal not displayed */
	StartupInterval int `` /* 134-byte string literal not displayed */
}

Config holds configuration for graph-index component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a valid default configuration

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults sets default values for configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate implements component.Validatable interface

type ContextEntry

type ContextEntry struct {
	EntityID  string `json:"entity_id"`
	Predicate string `json:"predicate"`
}

ContextEntry represents an entry in the context index. Each entry tracks which entity+predicate pair has a triple with a specific context value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL