Documentation
¶
Overview ¶
Package graphingest provides the graph-ingest component for entity and triple ingestion.
Package graphingest provides the graph-ingest component for entity and triple ingestion.
Overview ¶
The graph-ingest component is responsible for ingesting entities and triples into the graph system. It subscribes to JetStream subjects for incoming entity data and stores them in the ENTITY_STATES KV bucket.
Tier ¶
Tier: ALL TIERS (Tier 0, 1, 2) - Required for all deployments.
Architecture ¶
graph-ingest is a core component required for all deployment tiers (Structural, Statistical, Semantic). It serves as the entry point for all entity data flowing into the graph subsystem.
┌─────────────────┐
objectstore.stored ─┤ │
│ graph-ingest ├──► ENTITY_STATES (KV)
sensor.processed ─┤ │
└─────────────────┘
Features ¶
- Entity CRUD operations (create, read, update, delete)
- Triple mutations (add, remove)
- Hierarchy inference (optional) - creates container entities based on 6-part entity ID structure
- JetStream subscription with at-least-once delivery semantics
- KV storage with atomic updates
Configuration ¶
The component is configured via JSON with the following structure:
{
"ports": {
"inputs": [
{"name": "objectstore_in", "subject": "objectstore.stored.entity", "type": "jetstream"},
{"name": "sensor_in", "subject": "sensor.processed.entity", "type": "jetstream"}
],
"outputs": [
{"name": "entity_states", "subject": "ENTITY_STATES", "type": "kv"}
]
},
"enable_hierarchy": true
}
Port Definitions ¶
Inputs:
- JetStream subscriptions for entity events (objectstore.stored.entity, sensor.processed.entity)
Outputs:
- KV bucket: ENTITY_STATES - stores entity state with triples
Hierarchy Inference ¶
When enable_hierarchy is true, the component automatically creates container entities based on the 6-part entity ID structure (org.platform.domain.system.type.instance). This creates edges for:
- Type containers (*.group)
- System containers (*.container)
- Domain containers (*.level)
Usage ¶
Register the component with the component registry:
import graphingest "github.com/c360studio/semstreams/processor/graph-ingest"
func init() {
graphingest.Register(registry)
}
Dependencies ¶
This component has no upstream graph component dependencies. It is the entry point for entity data and other graph components (graph-index, graph-embedding, etc.) watch its output KV bucket.
Package graphingest mutation handlers for triple operations via NATS request/reply.
Package graphingest query handlers
Index ¶
- Constants
- func CreateGraphIngest(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Component
- func (c *Component) AddTriple(ctx context.Context, triple message.Triple) error
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) CreateEntity(ctx context.Context, entity *graph.EntityState) error
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) DeleteEntity(ctx context.Context, entityID string) error
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) RemoveTriple(ctx context.Context, subject, predicate string) error
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- func (c *Component) UpdateEntity(ctx context.Context, entity *graph.EntityState) error
- type Config
Constants ¶
const ( // SubjectTripleAdd is the NATS subject for add triple requests SubjectTripleAdd = "graph.mutation.triple.add" // SubjectTripleRemove is the NATS subject for remove triple requests SubjectTripleRemove = "graph.mutation.triple.remove" )
Variables ¶
This section is empty.
Functions ¶
func CreateGraphIngest ¶
func CreateGraphIngest(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateGraphIngest is the factory function for creating graph-ingest components
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the graph-ingest processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) CreateEntity ¶
CreateEntity creates a new entity in the graph
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) DeleteEntity ¶
DeleteEntity removes an entity from the graph
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize validates configuration and sets up ports (no I/O)
func (*Component) InputPorts ¶
InputPorts returns input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions
func (*Component) RemoveTriple ¶
RemoveTriple removes a triple from an entity using CAS for concurrency safety
func (*Component) UpdateEntity ¶
UpdateEntity updates an existing entity
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
EnableHierarchy bool `json:"enable_hierarchy" schema:"type:bool,description:Enable hierarchy inference,default:false,category:advanced"`
EnableTypeSiblings *bool `` /* 162-byte string literal not displayed */
}
Config holds configuration for graph-ingest component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a valid default configuration
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults sets default values for configuration