graphindextemporal

package
v1.0.0-alpha.46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 17 Imported by: 0

README

graph-index-temporal

Temporal indexing component for the graph subsystem.

Overview

The graph-index-temporal component watches the ENTITY_STATES KV bucket and maintains a time-based index for entities. This enables efficient time-range queries.

Architecture

                    ┌──────────────────────┐
ENTITY_STATES ─────►│                      │
   (KV watch)       │  graph-index-temporal├──► TEMPORAL_INDEX (KV)
                    │                      │
                    └──────────────────────┘

Features

  • Configurable Resolution: minute, hour, or day granularity
  • Automatic Timestamp Extraction: Extracts timestamps from entity data
  • Time Bucket Keys: Efficient range queries using bucket keys
  • Batch Processing: Efficient bulk index updates

Configuration

{
  "type": "processor",
  "name": "graph-index-temporal",
  "enabled": true,
  "config": {
    "ports": {
      "inputs": [
        {
          "name": "entity_watch",
          "subject": "ENTITY_STATES",
          "type": "kv-watch"
        }
      ],
      "outputs": [
        {
          "name": "temporal_index",
          "subject": "TEMPORAL_INDEX",
          "type": "kv"
        }
      ]
    },
    "time_resolution": "hour",
    "workers": 4,
    "batch_size": 100
  }
}
Configuration Options
Option Type Default Description
ports object required Port configuration
time_resolution string "hour" Resolution: "minute", "hour", or "day"
workers int 4 Number of worker goroutines
batch_size int 100 Batch size for index updates

Ports

Inputs
Name Type Subject Description
entity_watch kv-watch ENTITY_STATES Watch entity state changes
Outputs
Name Type Subject Description
temporal_index kv TEMPORAL_INDEX Temporal index

Time Resolution Guide

Resolution Key Format Use Case
minute 2024-01-15T10:30 Real-time monitoring
hour 2024-01-15T10 Operational dashboards
day 2024-01-15 Historical analysis

Index Structure

The TEMPORAL_INDEX uses time bucket as key:

{
  "time_bucket": "2024-01-15T10",
  "entities": [
    {
      "entity_id": "c360.logistics.warehouse.sensor.temperature.temp-001",
      "timestamp": "2024-01-15T10:30:00Z",
      "event_type": "updated"
    }
  ]
}

Timestamp Extraction

The component extracts timestamps from:

  • Entity updated_at field
  • Entity created_at field
  • timestamp predicate in triples
  • observation.timestamp predicate

Temporal Queries

The gateway component uses this index for:

  • Time range: Find entities modified between two timestamps
  • Time bucket: Find entities in specific hour/day
  • Recent: Find entities modified in last N hours

Dependencies

Upstream
  • graph-ingest - produces ENTITY_STATES
Downstream
  • graph-gateway - queries temporal index

Metrics

Metric Type Description
graph_temporal_indexed_total counter Total entities indexed
graph_temporal_buckets_active gauge Active time buckets
graph_temporal_errors_total counter Indexing errors

Health

The component reports healthy when:

  • KV watch subscription is active
  • TEMPORAL_INDEX bucket is accessible
  • Index updates completing successfully

Documentation

Overview

Package graphindextemporal provides the graph-index-temporal component for temporal indexing.

Package graphindextemporal provides the graph-index-temporal component for temporal indexing.

Overview

The graph-index-temporal component watches the ENTITY_STATES KV bucket and maintains a time-based index for entities with temporal data. This enables efficient time-range queries like "find entities modified between dates" or "find entities by hour".

Architecture

graph-index-temporal is an optional component that can be enabled for deployments requiring time-based query capabilities.

                    ┌──────────────────────┐
ENTITY_STATES ─────►│                      │
   (KV watch)       │  graph-index-temporal├──► TEMPORAL_INDEX (KV)
                    │                      │
                    └──────────────────────┘

Features

  • Configurable time resolution (minute, hour, day)
  • Automatic extraction of timestamps from entity data
  • Batch processing for efficient index updates
  • Time bucket keys for range queries

Configuration

The component is configured via JSON with the following structure:

{
  "ports": {
    "inputs": [
      {"name": "entity_watch", "subject": "ENTITY_STATES", "type": "kv-watch"}
    ],
    "outputs": [
      {"name": "temporal_index", "subject": "TEMPORAL_INDEX", "type": "kv"}
    ]
  },
  "time_resolution": "hour",
  "workers": 4,
  "batch_size": 100
}

Time Resolution

The time_resolution setting controls the granularity of temporal indexing:

  • minute: Index by minute (YYYY-MM-DDTHH:MM)
  • hour: Index by hour (YYYY-MM-DDTHH) - default
  • day: Index by day (YYYY-MM-DD)

Port Definitions

Inputs:

  • KV watch: ENTITY_STATES - watches for entity state changes

Outputs:

  • KV bucket: TEMPORAL_INDEX - time-based index

Usage

Register the component with the component registry:

import graphindextemporal "github.com/c360studio/semstreams/processor/graph-index-temporal"

func init() {
    graphindextemporal.Register(registry)
}

Dependencies

Upstream:

  • graph-ingest: produces ENTITY_STATES that this component watches

Downstream:

  • graph-gateway: reads TEMPORAL_INDEX for time-range queries

Package graphindextemporal query handlers

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateGraphIndexTemporal

func CreateGraphIndexTemporal(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateGraphIndexTemporal is the factory function for creating graph-index-temporal components

func Register

func Register(registry *component.Registry) error

Register registers the graph-index-temporal factory with the component registry

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the graph-index-temporal processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize validates configuration and sets up ports (no I/O)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing (must be initialized first)

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully shuts down the component

type Config

type Config struct {
	Ports          *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	TimeResolution string                `json:"time_resolution" schema:"type:string,description:Time resolution (minute hour day),category:basic"`
	Workers        int                   `json:"workers" schema:"type:int,description:Number of worker goroutines,category:advanced"`
	BatchSize      int                   `json:"batch_size" schema:"type:int,description:Batch size for processing,category:advanced"`

	// Dependency startup configuration
	StartupAttempts int `` /* 130-byte string literal not displayed */
	StartupInterval int `` /* 134-byte string literal not displayed */
}

Config holds configuration for graph-index-temporal component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a valid default configuration

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults sets default values for configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate implements component.Validatable interface

type TemporalResult

type TemporalResult struct {
	ID   string `json:"id"`
	Type string `json:"type"`
}

TemporalResult represents an entity found in temporal search

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL