Documentation
¶
Overview ¶
Package graphindex provides the graph-index component for maintaining graph relationship indexes.
Package graphindex provides the graph-index component for maintaining graph relationship indexes.
Overview ¶
The graph-index component watches the ENTITY_STATES KV bucket and maintains relationship indexes that enable efficient graph traversal and querying.
Tier ¶
Tier: ALL TIERS (Tier 0, 1, 2) - Required for all deployments.
Architecture ¶
graph-index is a core component required for all deployment tiers (Structural, Statistical, Semantic). It watches entity state changes and updates four index buckets in parallel.
┌─────────────────┐
│ ├──► OUTGOING_INDEX (KV)
ENTITY_STATES ─────►│ graph-index ├──► INCOMING_INDEX (KV)
(KV watch) │ ├──► ALIAS_INDEX (KV)
│ ├──► PREDICATE_INDEX (KV)
└─────────────────┘
Indexes ¶
The component maintains four relationship indexes:
- OUTGOING_INDEX: Maps entity ID → outgoing relationships (subject → predicate → object)
- INCOMING_INDEX: Maps entity ID → incoming relationships (object ← predicate ← subject)
- ALIAS_INDEX: Maps alias strings → entity IDs for fast lookup
- PREDICATE_INDEX: Maps predicate → entity IDs for predicate-based queries
Configuration ¶
The component is configured via JSON with the following structure:
{
"ports": {
"inputs": [
{"name": "entity_watch", "subject": "ENTITY_STATES", "type": "kv-watch"}
],
"outputs": [
{"name": "outgoing_index", "subject": "OUTGOING_INDEX", "type": "kv"},
{"name": "incoming_index", "subject": "INCOMING_INDEX", "type": "kv"},
{"name": "alias_index", "subject": "ALIAS_INDEX", "type": "kv"},
{"name": "predicate_index", "subject": "PREDICATE_INDEX", "type": "kv"}
]
},
"workers": 4,
"batch_size": 50
}
Port Definitions ¶
Inputs:
- KV watch: ENTITY_STATES - watches for entity state changes
Outputs:
- KV bucket: OUTGOING_INDEX - outgoing relationship index
- KV bucket: INCOMING_INDEX - incoming relationship index
- KV bucket: ALIAS_INDEX - entity alias lookup index
- KV bucket: PREDICATE_INDEX - predicate-based index
Usage ¶
Register the component with the component registry:
import graphindex "github.com/c360studio/semstreams/processor/graph-index"
func init() {
graphindex.Register(registry)
}
Dependencies ¶
Upstream:
- graph-ingest: produces ENTITY_STATES that this component watches
Downstream:
- graph-clustering: reads OUTGOING_INDEX and INCOMING_INDEX for structural analysis
- graph-gateway: reads indexes for query resolution
Package graphindex provides Prometheus metrics for graph-index component.
Package graphindex query handlers
Index ¶
- func CreateGraphIndex(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) DeleteFromAliasIndex(ctx context.Context, alias string) error
- func (c *Component) DeleteFromIncomingIndex(ctx context.Context, targetID, sourceID string) error
- func (c *Component) DeleteFromIndexes(ctx context.Context, entityID string) error
- func (c *Component) DeleteFromPredicateIndex(ctx context.Context, entityID, predicate string) error
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- func (c *Component) UpdateAliasIndex(ctx context.Context, alias, entityID string) error
- func (c *Component) UpdateContextIndex(ctx context.Context, entityID string, triples []message.Triple) error
- func (c *Component) UpdateIncomingIndex(ctx context.Context, targetID, sourceID, predicate string) error
- func (c *Component) UpdateOutgoingIndex(ctx context.Context, entityID, targetID, predicate string) error
- func (c *Component) UpdatePredicateIndex(ctx context.Context, entityID, predicate string) error
- type Config
- type ContextEntry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateGraphIndex ¶
func CreateGraphIndex(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateGraphIndex is the factory function for creating graph-index components
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the graph-index processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) DeleteFromAliasIndex ¶
DeleteFromAliasIndex deletes an alias from the alias index
func (*Component) DeleteFromIncomingIndex ¶
DeleteFromIncomingIndex deletes a specific incoming reference
func (*Component) DeleteFromIndexes ¶
DeleteFromIndexes deletes an entity from all indexes
func (*Component) DeleteFromPredicateIndex ¶
DeleteFromPredicateIndex deletes an entity from the predicate index
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize validates configuration and sets up ports (no I/O)
func (*Component) InputPorts ¶
InputPorts returns input port definitions. Reads directly from config so ports are available before Initialize().
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions. Reads directly from config so ports are available before Initialize().
func (*Component) UpdateAliasIndex ¶
UpdateAliasIndex updates the alias index for an entity
func (*Component) UpdateContextIndex ¶
func (c *Component) UpdateContextIndex(ctx context.Context, entityID string, triples []message.Triple) error
UpdateContextIndex updates the context index for triples with a context value. This enables provenance queries like "all triples from hierarchy inference". The operation is idempotent - replaying the same update has no effect.
func (*Component) UpdateIncomingIndex ¶
func (c *Component) UpdateIncomingIndex(ctx context.Context, targetID, sourceID, predicate string) error
UpdateIncomingIndex updates the incoming index for a single relationship. It delegates to updateIncomingIndexBatch so that tests and ad-hoc callers share the same merge logic as the batched path in processEntityUpdate.
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
Workers int `json:"workers" schema:"type:int,description:Number of worker goroutines,category:advanced"`
BatchSize int `json:"batch_size" schema:"type:int,description:Batch size for index updates,category:advanced"`
// Dependency startup configuration
StartupAttempts int `` /* 130-byte string literal not displayed */
StartupInterval int `` /* 134-byte string literal not displayed */
CoalesceMs int `` /* 141-byte string literal not displayed */
}
Config holds configuration for graph-index component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a valid default configuration
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults sets default values for configuration
type ContextEntry ¶
ContextEntry represents an entry in the context index. Each entry tracks which entity+predicate pair has a triple with a specific context value.