graphingest

package
v1.0.0-alpha.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: MIT Imports: 20 Imported by: 0

README

graph-ingest

Entity and triple ingestion component for the graph subsystem.

Overview

The graph-ingest component is the entry point for all entity data flowing into the graph system. It subscribes to JetStream subjects for incoming entity events and stores them in the ENTITY_STATES KV bucket.

Architecture

                    ┌─────────────────┐
objectstore.stored ─┤                 │
                    │  graph-ingest   ├──► ENTITY_STATES (KV)
sensor.processed   ─┤                 │
                    └─────────────────┘

Features

  • Entity CRUD: Create, read, update, delete entity operations
  • Triple Mutations: Add and remove triples on entities
  • Hierarchy Inference: Automatically creates container entities based on 6-part entity ID structure
  • At-Least-Once Delivery: JetStream subscription with proper acknowledgment

Configuration

{
  "type": "processor",
  "name": "graph-ingest",
  "enabled": true,
  "config": {
    "ports": {
      "inputs": [
        {
          "name": "objectstore_in",
          "subject": "objectstore.stored.entity",
          "type": "jetstream",
          "interface": "storage.stored.v1"
        },
        {
          "name": "sensor_in",
          "subject": "sensor.processed.entity",
          "type": "jetstream",
          "interface": "iot.sensor.v1"
        }
      ],
      "outputs": [
        {
          "name": "entity_states",
          "subject": "ENTITY_STATES",
          "type": "kv"
        }
      ]
    },
    "enable_hierarchy": true
  }
}
Configuration Options
Option Type Default Description
ports object required Port configuration for inputs and outputs
enable_hierarchy bool false Enable automatic hierarchy inference

Ports

Inputs
Name Type Subject Description
objectstore_in jetstream objectstore.stored.entity Stored document entity events
sensor_in jetstream sensor.processed.entity Sensor entity events
Outputs
Name Type Subject Description
entity_states kv ENTITY_STATES Entity state storage

Hierarchy Inference

When enable_hierarchy is enabled, the component automatically creates container entities based on the 6-part entity ID structure:

org.platform.domain.system.type.instance
 │      │       │      │     │      │
 └──────┴───────┴──────┴─────┴──────┴─► Real entity
        │       │      │     │
        └───────┴──────┴─────┴─► Type container (*.group)
                │      │
                └──────┴─► System container (*.container)
                       │
                       └─► Domain container (*.level)

This creates edges that enable:

  • Efficient traversal by type, system, or domain
  • Automatic grouping without explicit relationship creation
  • Query-time aggregation by hierarchy level

Dependencies

Upstream
  • None (entry point component)
Downstream
  • graph-index - watches ENTITY_STATES for index updates
  • graph-embedding - watches ENTITY_STATES for embedding generation
  • graph-clustering - watches ENTITY_STATES for community detection triggers

Metrics

Metric Type Description
graph_ingest_entities_processed_total counter Total entities processed
graph_ingest_triples_added_total counter Total triples added
graph_ingest_errors_total counter Total processing errors
graph_ingest_processing_duration_seconds histogram Processing time per entity

Health

The component reports healthy when:

  • JetStream subscription is active
  • KV bucket is accessible
  • No sustained error rate above threshold

Documentation

Overview

Package graphingest provides the graph-ingest component for entity and triple ingestion.

Package graphingest provides the graph-ingest component for entity and triple ingestion.

Overview

The graph-ingest component is responsible for ingesting entities and triples into the graph system. It subscribes to JetStream subjects for incoming entity data and stores them in the ENTITY_STATES KV bucket.

Tier

Tier: ALL TIERS (Tier 0, 1, 2) - Required for all deployments.

Architecture

graph-ingest is a core component required for all deployment tiers (Structural, Statistical, Semantic). It serves as the entry point for all entity data flowing into the graph subsystem.

                    ┌─────────────────┐
objectstore.stored ─┤                 │
                    │  graph-ingest   ├──► ENTITY_STATES (KV)
sensor.processed   ─┤                 │
                    └─────────────────┘

Features

  • Entity CRUD operations (create, read, update, delete)
  • Triple mutations (add, remove)
  • Hierarchy inference (optional) - creates container entities based on 6-part entity ID structure
  • JetStream subscription with at-least-once delivery semantics
  • KV storage with atomic updates

Configuration

The component is configured via JSON with the following structure:

{
  "ports": {
    "inputs": [
      {"name": "objectstore_in", "subject": "objectstore.stored.entity", "type": "jetstream"},
      {"name": "sensor_in", "subject": "sensor.processed.entity", "type": "jetstream"}
    ],
    "outputs": [
      {"name": "entity_states", "subject": "ENTITY_STATES", "type": "kv"}
    ]
  },
  "enable_hierarchy": true
}

Port Definitions

Inputs:

  • JetStream subscriptions for entity events (objectstore.stored.entity, sensor.processed.entity)

Outputs:

  • KV bucket: ENTITY_STATES - stores entity state with triples

Hierarchy Inference

When enable_hierarchy is true, the component automatically creates container entities based on the 6-part entity ID structure (org.platform.domain.system.type.instance). This creates edges for:

  • Type containers (*.group)
  • System containers (*.container)
  • Domain containers (*.level)

Usage

Register the component with the component registry:

import graphingest "github.com/c360studio/semstreams/processor/graph-ingest"

func init() {
    graphingest.Register(registry)
}

Dependencies

This component has no upstream graph component dependencies. It is the entry point for entity data and other graph components (graph-index, graph-embedding, etc.) watch its output KV bucket.

Package graphingest mutation handlers for triple operations via NATS request/reply.

Package graphingest query handlers

Index

Constants

View Source
const (
	// SubjectTripleAdd is the NATS subject for add triple requests
	SubjectTripleAdd = "graph.mutation.triple.add"
	// SubjectTripleRemove is the NATS subject for remove triple requests
	SubjectTripleRemove = "graph.mutation.triple.remove"
)

Variables

This section is empty.

Functions

func CreateGraphIngest

func CreateGraphIngest(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateGraphIngest is the factory function for creating graph-ingest components

func Register

func Register(registry *component.Registry) error

Register registers the graph-ingest factory with the component registry

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the graph-ingest processor

func (*Component) AddTriple

func (c *Component) AddTriple(ctx context.Context, triple message.Triple) error

AddTriple adds a triple to an entity using CAS for concurrency safety

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) CreateEntity

func (c *Component) CreateEntity(ctx context.Context, entity *graph.EntityState) error

CreateEntity creates a new entity in the graph

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) DeleteEntity

func (c *Component) DeleteEntity(ctx context.Context, entityID string) error

DeleteEntity removes an entity from the graph

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize validates configuration and sets up ports (no I/O)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions. Reads directly from config so ports are available before Initialize().

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions. Reads directly from config so ports are available before Initialize().

func (*Component) RemoveTriple

func (c *Component) RemoveTriple(ctx context.Context, subject, predicate string) error

RemoveTriple removes a triple from an entity using CAS for concurrency safety

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing (must be initialized first)

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully shuts down the component

func (*Component) UpdateEntity

func (c *Component) UpdateEntity(ctx context.Context, entity *graph.EntityState) error

UpdateEntity updates an existing entity

type Config

type Config struct {
	Ports              *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	EnableHierarchy    bool                  `json:"enable_hierarchy" schema:"type:bool,description:Enable hierarchy inference,default:false,category:advanced"`
	EnableTypeSiblings *bool                 `` /* 162-byte string literal not displayed */
}

Config holds configuration for graph-ingest component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a valid default configuration

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults sets default values for configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate implements component.Validatable interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL