query

package
v1.0.0-alpha.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: MIT Imports: 20 Imported by: 0

README

Graph Query Library

A clean, read-only interface for querying graph data from NATS KV buckets. This package provides efficient graph traversal and entity queries with built-in caching.

Overview

The query library separates read operations from write operations in the graph system:

  • GraphProcessor: Writes entities and relationships to NATS KV
  • QueryClient: Reads and traverses the graph from NATS KV

Features

  • Direct NATS KV Access: Reads from shared buckets without processor dependency
  • Built-in Caching: LRU/TTL/Hybrid caching strategies for performance
  • Path Traversal: Bounded graph traversal with resource limits
  • Bidirectional Queries: Support for both outgoing and incoming edges
  • Batch Operations: Efficient multi-entity retrieval
  • Spatial Queries: Geohash-based entity location queries

Usage

Basic Setup
import "github.com/c360/semstreams/pkg/graph/query"

// Create client with default config
client, err := query.NewClient(natsClient, query.DefaultConfig())
if err != nil {
    return err
}
defer client.Close()

// Or with custom config
config := &query.Config{
    EntityCache: cache.Config{
        Strategy: cache.StrategyHybrid,
        MaxSize:  1000,
        TTL:      5 * time.Minute,
    },
}
client, err := query.NewClient(natsClient, config)
Entity Operations
// Get single entity
entity, err := client.GetEntity(ctx, "drone_001")

// Get entities by type
drones, err := client.GetEntitiesByType(ctx, "robotics.drone")

// Batch retrieval
ids := []string{"drone_001", "drone_002", "drone_003"}
entities, err := client.GetEntitiesBatch(ctx, ids)

// Count all entities
count, err := client.CountEntities(ctx)
Relationship Queries
// Get outgoing edges
edges, err := client.GetOutgoingEdges(ctx, "drone_001")

// Get incoming edges (who points to this entity)
incoming, err := client.GetIncomingEdges(ctx, "drone_001")

// Get all connected entities
connections, err := client.GetEntityConnections(ctx, "drone_001")

// Verify specific relationship
exists, err := client.VerifyRelationship(ctx, "drone_001", "gcs_001", "CONTROLLED_BY")
Path Traversal (PathRAG)

Basic Path Query:

// Configure bounded traversal
query := query.PathQuery{
    StartEntity:  "drone_001",
    MaxDepth:     3,              // Maximum hops
    MaxNodes:     100,            // Maximum nodes to visit
    MaxTime:      100 * time.Millisecond,
    EdgeFilter:   []string{"NEAR", "MEMBER_OF"},
    DecayFactor:  0.8,            // Score decay per hop
    MaxPaths:     10,             // Limit result paths
}

// Execute traversal
result, err := client.ExecutePathQuery(ctx, query)

// Process results
for _, entity := range result.Entities {
    score := result.Scores[entity.Node.ID]
    fmt.Printf("Entity: %s, Score: %.2f\n", entity.Node.ID, score)
}

// Check if truncated
if result.Truncated {
    log.Println("Query hit resource limits")
}
Complex Path Traversal Scenarios

Scenario 1: Dependency Chain Analysis

Find all services affected by a configuration change:

query := query.PathQuery{
    StartEntity: "config.database.credentials",
    MaxDepth:    5,
    MaxNodes:    200,
    MaxTime:     500 * time.Millisecond,
    EdgeFilter:  []string{"depends_on", "calls"},
    DecayFactor: 0.9,  // Services are important even if distant
    MaxPaths:    50,
}

result, err := client.ExecutePathQuery(ctx, query)
if err != nil {
    return fmt.Errorf("dependency analysis failed: %w", err)
}

// Group by depth for cascade visualization
byDepth := make(map[int][]string)
for _, path := range result.Paths {
    depth := len(path) - 1
    if depth > 0 {
        affectedService := path[len(path)-1]
        byDepth[depth] = append(byDepth[depth], affectedService)
    }
}

// Display cascade
for depth := 1; depth <= query.MaxDepth; depth++ {
    if services, ok := byDepth[depth]; ok {
        fmt.Printf("Depth %d (%d services affected):\n", depth, len(services))
        for _, svc := range services {
            score := result.Scores[svc]
            fmt.Printf("  - %s (score: %.2f)\n", svc, score)
        }
    }
}

Scenario 2: Spatial Mesh Network Discovery

Find all drones reachable via mesh network from base station:

query := query.PathQuery{
    StartEntity: "base.alpha",
    MaxDepth:    4,  // 4 hops max for mesh relay
    MaxNodes:    100,
    MaxTime:     300 * time.Millisecond,
    EdgeFilter:  []string{"near", "communicates", "relays_to"},
    DecayFactor: 0.7,  // Strong decay (signal strength)
    MaxPaths:    20,
}

result, err := client.ExecutePathQuery(ctx, query)

// Find weakest links (lowest scoring paths)
pathScores := make([]struct {
    path  []string
    score float64
}{}, 0, len(result.Paths))

for _, path := range result.Paths {
    // Path score = minimum entity score in path
    minScore := 1.0
    for _, entityID := range path {
        if score := result.Scores[entityID]; score < minScore {
            minScore = score
        }
    }
    pathScores = append(pathScores, struct {
        path  []string
        score float64
    }{path, minScore})
}

// Sort by score (ascending = weakest first)
sort.Slice(pathScores, func(i, j int) bool {
    return pathScores[i].score < pathScores[j].score
})

// Report weakest communication paths
fmt.Println("Weakest mesh links (need relay boost):")
for i := 0; i < 5 && i < len(pathScores); i++ {
    path := pathScores[i].path
    score := pathScores[i].score
    fmt.Printf("  %s (signal strength: %.1f%%)\n",
        strings.Join(path, " → "), score*100)
}

Scenario 3: Incident Impact Radius

Trace cascading effects of a system failure:

query := query.PathQuery{
    StartEntity: "alert.network.outage.region-west",
    MaxDepth:    4,
    MaxNodes:    300,
    MaxTime:     1 * time.Second,
    EdgeFilter:  []string{"triggers", "affects", "depends_on"},
    DecayFactor: 0.85,
    MaxPaths:    100,
}

result, err := client.ExecutePathQuery(ctx, query)

// Classify impact by entity type
impactByType := make(map[string][]string)
for _, entity := range result.Entities {
    if entity.Node.ID == query.StartEntity {
        continue // Skip starting entity
    }
    impactByType[entity.Node.Type] = append(
        impactByType[entity.Node.Type],
        entity.Node.ID,
    )
}

// Report impact summary
fmt.Println("Incident Impact Analysis:")
for entityType, affected := range impactByType {
    fmt.Printf("  %s: %d affected\n", entityType, len(affected))

    // Show top 3 most impacted (highest scores = closest)
    type scored struct {
        id    string
        score float64
    }
    scoredEntities := make([]scored, 0, len(affected))
    for _, id := range affected {
        scoredEntities = append(scoredEntities, scored{id, result.Scores[id]})
    }
    sort.Slice(scoredEntities, func(i, j int) bool {
        return scoredEntities[i].score > scoredEntities[j].score
    })

    for i := 0; i < 3 && i < len(scoredEntities); i++ {
        fmt.Printf("    - %s (severity: %.2f)\n",
            scoredEntities[i].id, scoredEntities[i].score)
    }
}

Scenario 4: Time-Bounded Discovery

Find as much as possible in limited time (real-time constraint):

query := query.PathQuery{
    StartEntity: "sensor.temp.critical",
    MaxDepth:    10,     // High depth
    MaxNodes:    10000,  // High limit
    MaxTime:     50 * time.Millisecond,  // PRIMARY constraint
    EdgeFilter:  nil,    // Explore all edges
    DecayFactor: 0.8,
    MaxPaths:    50,
}

result, err := client.ExecutePathQuery(ctx, query)

if result.Truncated {
    fmt.Printf("Discovery truncated after %dms\n",
        query.MaxTime.Milliseconds())
    fmt.Printf("Visited %d nodes (limit: %d)\n",
        len(result.Entities), query.MaxNodes)
}

// Even truncated results are useful for approximate context
fmt.Printf("Discovered %d related entities:\n", len(result.Entities))

Scenario 5: Multi-Relationship Pattern

Explore complex relationship patterns (e.g., "Find experts on this topic"):

// Find entities with multiple relationship types
query := query.PathQuery{
    StartEntity: "topic.graph-rag",
    MaxDepth:    3,
    MaxNodes:    150,
    MaxTime:     200 * time.Millisecond,
    EdgeFilter:  []string{
        "authored_by",      // Documents authored
        "contributed_to",   // Code contributions
        "references",       // Citations
        "implements",       // Implementations
    },
    DecayFactor: 0.9,
    MaxPaths:    30,
}

result, err := client.ExecutePathQuery(ctx, query)

// Count relationship types per entity
entityRelCounts := make(map[string]map[string]int)
for _, entity := range result.Entities {
    if entity.Node.ID == query.StartEntity {
        continue
    }

    relCounts := make(map[string]int)
    for _, edge := range entity.Edges {
        // Check if edge type in filter
        for _, filterType := range query.EdgeFilter {
            if edge.Predicate == filterType {
                relCounts[filterType]++
            }
        }
    }

    if len(relCounts) > 0 {
        entityRelCounts[entity.Node.ID] = relCounts
    }
}

// Find entities with most diverse relationships (likely experts)
type expertise struct {
    id         string
    diversity  int
    totalRels  int
    score      float64
}

experts := make([]expertise, 0)
for id, relCounts := range entityRelCounts {
    total := 0
    for _, count := range relCounts {
        total += count
    }
    experts = append(experts, expertise{
        id:        id,
        diversity: len(relCounts),
        totalRels: total,
        score:     result.Scores[id],
    })
}

// Sort by diversity first, then total relationships
sort.Slice(experts, func(i, j int) bool {
    if experts[i].diversity != experts[j].diversity {
        return experts[i].diversity > experts[j].diversity
    }
    return experts[i].totalRels > experts[j].totalRels
})

fmt.Println("Top experts on topic:")
for i := 0; i < 5 && i < len(experts); i++ {
    e := experts[i]
    fmt.Printf("  %s: %d rel types, %d total (score: %.2f)\n",
        e.id, e.diversity, e.totalRels, e.score)
}

Scenario 6: EdgeFilter Tuning

Compare results with different edge filters:

baseQuery := query.PathQuery{
    StartEntity: "alert.security.breach",
    MaxDepth:    3,
    MaxNodes:    100,
    MaxTime:     200 * time.Millisecond,
    DecayFactor: 0.85,
}

// Try different filter strategies
filters := map[string][]string{
    "all":        nil,
    "direct":     []string{"triggers", "causes"},
    "indirect":   []string{"related_to", "similar_to"},
    "structural": []string{"depends_on", "contains"},
}

for name, filter := range filters {
    baseQuery.EdgeFilter = filter
    result, err := client.ExecutePathQuery(ctx, &baseQuery)
    if err != nil {
        continue
    }

    fmt.Printf("Filter '%s': %d entities, %d paths\n",
        name, len(result.Entities), len(result.Paths))
}

Scenario 7: DecayFactor Comparison

Tune relevance decay for use case:

decayFactors := []float64{0.6, 0.75, 0.85, 0.95, 1.0}

for _, decay := range decayFactors {
    query := query.PathQuery{
        StartEntity: "service.api",
        MaxDepth:    4,
        MaxNodes:    150,
        EdgeFilter:  []string{"calls"},
        DecayFactor: decay,
    }

    result, _ := client.ExecutePathQuery(ctx, &query)

    // Calculate average score at each depth
    depthScores := make(map[int][]float64)
    for _, path := range result.Paths {
        for i, entityID := range path {
            depthScores[i] = append(depthScores[i], result.Scores[entityID])
        }
    }

    fmt.Printf("DecayFactor %.2f:\n", decay)
    for depth := 0; depth <= query.MaxDepth; depth++ {
        if scores, ok := depthScores[depth]; ok && len(scores) > 0 {
            avg := 0.0
            for _, s := range scores {
                avg += s
            }
            avg /= float64(len(scores))
            fmt.Printf("  Depth %d: avg score %.2f\n", depth, avg)
        }
    }
}

See PathRAG Documentation for comprehensive guide.

Advanced Queries
// Query with criteria
criteria := map[string]any{
    "type":     "robotics.drone",
    "armed":    true,
    "battery":  75.0,
}
entities, err := client.QueryEntities(ctx, criteria)

// Spatial queries
geohash := "gbsuv7"  // San Francisco area
nearbyEntities, err := client.GetEntitiesInRegion(ctx, geohash)
Cache Management
// Get cache statistics
stats := client.GetCacheStats()
fmt.Printf("Cache hit rate: %.2f%% (%d hits, %d misses)\n", 
    stats.HitRate*100, stats.Hits, stats.Misses)

// Clear cache
err := client.Clear()

Component Integration

The query library is used by components that need to read graph data:

// In ContextProcessor
func NewFactory(natsClient *natsclient.Client, objectStore *objectstore.Store) component.Factory {
    return func(ctx context.Context, config map[string]any) (component.Discoverable, error) {
        // Create query client internally
        queryClient, err := query.NewClient(natsClient, query.DefaultConfig())
        if err != nil {
            return nil, fmt.Errorf("failed to create query client: %w", err)
        }
        
        return NewProcessor(natsClient, objectStore, queryClient, &config)
    }
}

Configuration

Default Configuration
config := query.DefaultConfig()
// Returns:
// - EntityCache: Hybrid strategy, 1000 items, 5min TTL
// - EntityStates bucket: 24h TTL, 3 history, 1 replica
// - SpatialIndex bucket: 1h TTL, 1 history, 1 replica  
// - IncomingIndex bucket: 24h TTL, 1 history, 1 replica
Custom Configuration
config := &query.Config{
    EntityCache: cache.Config{
        Strategy:        cache.StrategyLRU,
        MaxSize:        5000,
        TTL:            10 * time.Minute,
        CleanupInterval: 1 * time.Minute,
        EnableStats:    true,
    },
    EntityStates: struct {
        TTL      time.Duration
        History  uint8
        Replicas int
    }{
        TTL:      7 * 24 * time.Hour,  // 1 week
        History:  5,
        Replicas: 3,
    },
}

Performance Considerations

  1. Caching: The hybrid cache strategy provides optimal performance for most use cases
  2. Batch Operations: Use GetEntitiesBatch() for multiple entities
  3. Path Limits: Always set reasonable limits on PathQuery to prevent runaway traversals
  4. Context Timeouts: Use context with timeout for all operations

Testing

// Create mock client for testing
type mockClient struct {
    entities map[string]*gtypes.EntityState
}

func (m *mockClient) GetEntity(ctx context.Context, id string) (*gtypes.EntityState, error) {
    entity, exists := m.entities[id]
    if !exists {
        return nil, fmt.Errorf("entity not found")
    }
    return entity, nil
}

// Use in tests
mock := &mockClient{
    entities: map[string]*gtypes.EntityState{
        "test_001": testEntity,
    },
}

Thread Safety

All Client methods are thread-safe and can be called concurrently. The implementation uses:

  • Atomic operations for statistics
  • Mutex protection for bucket initialization
  • Thread-safe cache implementation

Error Handling

The library returns descriptive errors:

  • Entity not found: Returns nil entity, no error
  • Network issues: Wrapped errors with context
  • Invalid parameters: Validation errors with details
  • Resource limits: Sets Truncated flag in results

Best Practices

  1. Reuse Clients: Create one client and share across goroutines
  2. Set Timeouts: Always use context with timeout for queries
  3. Monitor Cache: Check cache stats periodically for tuning
  4. Validate Parameters: Check query parameters before execution
  5. Handle Truncation: Check Truncated flag in path results

Documentation

Overview

Package query provides a clean interface for reading graph data from NATS KV buckets.

Package query provides a clean interface for reading graph data from NATS KV buckets.

Overview

The query package extracts read operations from the graph processor, enabling reusable graph queries with caching. It provides entity retrieval, relationship traversal, spatial queries, and bounded path queries suitable for resource-constrained edge devices.

All queries read from NATS KV buckets (ENTITY_STATES, SPATIAL_INDEX, INCOMING_INDEX) with an integrated cache layer for performance. The client manages bucket initialization lazily on first use.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        Query Client                             │
│  - Entity caching (hybrid LRU+TTL)                              │
│  - Lazy bucket initialization                                   │
│  - Bounded path traversal                                       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌────────────────────┬────────────────────┬───────────────────────┐
│   ENTITY_STATES    │   SPATIAL_INDEX    │   INCOMING_INDEX      │
│   (entity data)    │   (geohash→IDs)    │   (target→sources)    │
└────────────────────┴────────────────────┴───────────────────────┘

Usage

Create a query client:

config := query.DefaultConfig()
client, err := query.NewClient(ctx, natsClient, config)

Basic entity operations:

// Get single entity
entity, err := client.GetEntity(ctx, "org.platform.domain.system.type.instance")

// Get multiple entities
entities, err := client.GetEntitiesBatch(ctx, []string{id1, id2, id3})

// Get entities by type (extracted from ID)
drones, err := client.GetEntitiesByType(ctx, "drone")

Relationship queries:

// Get outgoing relationships by predicate
targets, err := client.GetOutgoingRelationships(ctx, entityID, "controls")

// Get incoming relationships (from INCOMING_INDEX)
sources, err := client.GetIncomingRelationships(ctx, entityID)

// Get all connected entities (both directions)
connected, err := client.GetEntityConnections(ctx, entityID)

// Verify specific relationship exists
exists, err := client.VerifyRelationship(ctx, fromID, toID, "controls")

Bounded path traversal for edge devices:

result, err := client.ExecutePathQuery(ctx, query.PathQuery{
    StartEntity:     "org.platform.domain.system.drone.001",
    MaxDepth:        3,           // Maximum hops
    MaxNodes:        100,         // Maximum entities to visit
    MaxTime:         5*time.Second,
    PredicateFilter: []string{"controls", "monitors"}, // Empty = all
    DecayFactor:     0.8,         // Relevance decay per hop
    MaxPaths:        50,          // Limit path explosion
})

Spatial queries:

// Get entities in geohash region
entities, err := client.GetEntitiesInRegion(ctx, "9q8yy")

Path Query

The PathQuery performs bounded graph traversal with configurable limits:

Parameters:

  • StartEntity: Entity ID to begin traversal from
  • MaxDepth: Maximum hop count (prevents infinite recursion)
  • MaxNodes: Maximum entities to visit (memory bound)
  • MaxTime: Query timeout (prevents runaway queries)
  • PredicateFilter: Relationship predicates to follow (empty = all)
  • DecayFactor: Score decay per hop (0.0-1.0)
  • MaxPaths: Maximum complete paths to track (0 = unlimited)

The result includes:

  • Entities: All visited entity states
  • Paths: Complete paths from start to leaves
  • Scores: Relevance scores with decay applied
  • Truncated: Whether query hit resource limits

Caching

The client uses a hybrid cache (LRU + TTL) for entity states:

EntityCache:
    Strategy:        "hybrid"     # LRU eviction with TTL expiry
    MaxSize:         1000         # Maximum cached entities
    TTL:             5m           # Time-to-live
    CleanupInterval: 1m           # Background cleanup interval

Cache statistics are available via GetCacheStats().

Configuration

Client configuration:

EntityCache:              # See cache.Config
EntityStates:
    TTL:      24h         # KV bucket TTL
    History:  3           # Version history
    Replicas: 1           # Replication factor
SpatialIndex:
    TTL:      1h          # Spatial data TTL
    History:  1
    Replicas: 1
IncomingIndex:
    TTL:      24h         # Incoming edge TTL
    History:  1
    Replicas: 1

Thread Safety

The Client is safe for concurrent use. Bucket initialization uses mutex protection, and cache operations are atomic.

Metrics

When created with NewClientWithMetrics, the client exports cache metrics under the "query_client" namespace.

See Also

Related packages:

Package query provides a clean interface for reading graph data from NATS KV buckets. This package extracts query operations from GraphProcessor to enable reusable graph queries.

Index

Constants

View Source
const (
	AggregationCount = "count"
	AggregationAvg   = "avg"
	AggregationSum   = "sum"
	AggregationMin   = "min"
	AggregationMax   = "max"
)

Aggregation type constants for aggregation queries.

View Source
const DefaultMaxCommunities = 5

DefaultMaxCommunities is the default number of communities to search.

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheStats

type CacheStats struct {
	Hits        int64     `json:"hits"`
	Misses      int64     `json:"misses"`
	Size        int       `json:"size"`
	HitRate     float64   `json:"hit_rate"`
	LastCleared time.Time `json:"last_cleared"`
}

CacheStats provides statistics about query cache performance

type ClassificationResult

type ClassificationResult struct {
	Tier       int            // Which tier produced the result (0=keyword, 1=BM25, 2=neural)
	Intent     string         // Classified intent (from embedding match, empty for keyword)
	Options    map[string]any // SearchOptions hints (from keyword or embedding example)
	Confidence float64        // Confidence score (1.0 for keywords, similarity for embedding)
}

ClassificationResult contains the classification output from the tiered classifier chain.

type Classifier

type Classifier interface {
	// ClassifyQuery analyzes a query string and returns SearchOptions.
	ClassifyQuery(ctx context.Context, query string) *SearchOptions
}

Classifier analyzes natural language queries to extract search intent.

type ClassifierChain

type ClassifierChain struct {
	// contains filtered or unexported fields
}

ClassifierChain orchestrates tiered query classification.

Routing logic:

  • T0 (keyword): Always try first - keyword patterns bypass embedding
  • T1/T2 (embedding): Only if no keyword match AND embedding != nil
  • Default: Return T0 result with empty options if no tier matches

Thread-safe for concurrent ClassifyQuery calls.

func NewClassifierChain

func NewClassifierChain(keyword *KeywordClassifier, embedding *EmbeddingClassifier, llmClassifiers ...*LLMClassifier) *ClassifierChain

NewClassifierChain creates a classifier chain with keyword and optional embedding/LLM classifiers.

All parameters may be nil. Chain will route queries through available tiers.

func (*ClassifierChain) ClassifyQuery

func (c *ClassifierChain) ClassifyQuery(ctx context.Context, query string) *ClassificationResult

ClassifyQuery classifies a query through the tier chain.

Returns result from first tier that produces a match:

  • T0: Keyword patterns (temporal, spatial, similarity, path, zone)
  • T1/T2: Embedding similarity (if available and no keyword match)
  • Default: T0 result with no filters if no tier matches

Returns nil if chain is nil or context cancelled.

type Client

type Client interface {
	// Basic entity operations
	GetEntity(ctx context.Context, entityID string) (*gtypes.EntityState, error)
	GetEntitiesByType(ctx context.Context, entityType string) ([]*gtypes.EntityState, error)
	GetEntitiesBatch(ctx context.Context, entityIDs []string) ([]*gtypes.EntityState, error)

	// Entity listing and counting
	ListEntities(ctx context.Context) ([]string, error)
	CountEntities(ctx context.Context) (int, error)

	// Graph traversal and path queries
	ExecutePathQuery(ctx context.Context, query PathQuery) (*PathResult, error)

	// Relationship queries (using triples)
	GetIncomingRelationships(ctx context.Context, entityID string) ([]string, error)
	GetOutgoingRelationships(ctx context.Context, entityID string, predicate string) ([]string, error)
	GetEntityConnections(ctx context.Context, entityID string) ([]*gtypes.EntityState, error)
	VerifyRelationship(ctx context.Context, fromID, toID, predicate string) (bool, error)
	CountIncomingRelationships(ctx context.Context, entityID string) (int, error)

	// Search and filtering
	QueryEntities(ctx context.Context, criteria map[string]any) ([]*gtypes.EntityState, error)

	// Spatial queries
	GetEntitiesInRegion(ctx context.Context, geohash string) ([]*gtypes.EntityState, error)

	// Predicate queries
	GetEntitiesByPredicate(ctx context.Context, predicate string) ([]string, error)
	ListPredicates(ctx context.Context) ([]gtypes.PredicateSummary, error)
	GetPredicateStats(ctx context.Context, predicate string, sampleLimit int) (*gtypes.PredicateStatsData, error)
	QueryCompoundPredicates(ctx context.Context, query gtypes.CompoundPredicateQuery) ([]string, error)

	// Cache and metrics
	GetCacheStats() CacheStats
	Clear() error
	Close() error
}

Client defines the interface for querying graph data. All methods read from NATS KV buckets and support caching for performance.

func NewClient

func NewClient(ctx context.Context, nc *natsclient.Client, config *Config) (Client, error)

NewClient creates a new query client with the given NATS client and configuration

func NewClientWithMetrics

func NewClientWithMetrics(
	ctx context.Context,
	nc *natsclient.Client,
	config *Config,
	metricsRegistry *metric.MetricsRegistry,
) (Client, error)

NewClientWithMetrics creates a new query client with the given NATS client, configuration, and optional metrics

type Config

type Config struct {
	// EntityCache configuration
	EntityCache cache.Config `json:"entity_cache"`

	// KV Bucket configurations (should match GraphProcessor config)
	EntityStates struct {
		TTL      time.Duration `json:"ttl"`
		History  uint8         `json:"history"`
		Replicas int           `json:"replicas"`
	} `json:"entity_states"`

	SpatialIndex struct {
		TTL      time.Duration `json:"ttl"`
		History  uint8         `json:"history"`
		Replicas int           `json:"replicas"`
	} `json:"spatial_index"`

	IncomingIndex struct {
		TTL      time.Duration `json:"ttl"`
		History  uint8         `json:"history"`
		Replicas int           `json:"replicas"`
	} `json:"incoming_index"`
}

Config contains configuration for the Client

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a sensible default configuration

type DomainExamples

type DomainExamples struct {
	Domain   string    `json:"domain"`
	Version  string    `json:"version"`
	Examples []Example `json:"examples"`
}

DomainExamples represents a collection of examples for a domain.

func LoadAllDomainExamples

func LoadAllDomainExamples(filePaths []string) ([]*DomainExamples, error)

LoadAllDomainExamples loads and aggregates multiple domain example files.

func LoadDomainExamples

func LoadDomainExamples(filePath string) (*DomainExamples, error)

LoadDomainExamples loads query examples from a JSON file.

type Embedder

type Embedder interface {
	// Embed generates a vector for a single text.
	Embed(text string) ([]float32, error)

	// EmbedBatch generates vectors for multiple texts.
	EmbedBatch(texts []string) ([][]float32, error)
}

Embedder interface for vector generation.

This is a simplified interface for the query classifier that supports both single and batch embedding operations.

type EmbeddingClassifier

type EmbeddingClassifier struct {
	// contains filtered or unexported fields
}

EmbeddingClassifier classifies queries by finding similar domain examples.

The classifier starts with BM25 vectors (warm cache - no external service needed) and can be upgraded to neural vectors later when embeddings are available.

Thread-safe for concurrent FindBestMatch calls during vector upgrades.

func NewEmbeddingClassifier

func NewEmbeddingClassifier(domains []*DomainExamples, threshold float64) *EmbeddingClassifier

NewEmbeddingClassifier creates classifier with BM25 warm cache.

Generates BM25 vectors for all examples immediately (no external service needed). Returns a classifier ready for statistical similarity matching.

func (*EmbeddingClassifier) FindBestMatch

func (c *EmbeddingClassifier) FindBestMatch(ctx context.Context, query string) (*Example, float64)

FindBestMatch finds the most similar example to the query.

Returns nil if no match above threshold or context cancelled. Thread-safe - uses read lock to allow concurrent calls during vector upgrades.

func (*EmbeddingClassifier) Threshold

func (c *EmbeddingClassifier) Threshold() float64

Threshold returns the similarity threshold.

func (*EmbeddingClassifier) UpgradeVectors

func (c *EmbeddingClassifier) UpgradeVectors(embedder Embedder) error

UpgradeVectors replaces current vectors with neural vectors from new embedder.

Thread-safe - uses write lock to prevent concurrent reads during upgrade. On error, preserves old vectors (rollback).

type Example

type Example struct {
	Query   string         `json:"query"`   // Natural language query
	Intent  string         `json:"intent"`  // Intent category
	Options map[string]any `json:"options"` // SearchOptions hints (optional)
	Vector  []float32      `json:"-"`       // Runtime field - embedding vector (not serialized)
}

Example represents a single domain query example for intent classification.

type KeywordClassifier

type KeywordClassifier struct{}

KeywordClassifier implements regex-based natural language query classification. Uses pattern matching to extract temporal, spatial, and intent information.

func NewKeywordClassifier

func NewKeywordClassifier() *KeywordClassifier

NewKeywordClassifier creates a new keyword-based classifier.

func (*KeywordClassifier) ClassifyQuery

func (k *KeywordClassifier) ClassifyQuery(_ context.Context, query string) *SearchOptions

ClassifyQuery analyzes a natural language query and populates SearchOptions. Detects temporal, spatial, similarity, path, and aggregation intents. Always returns non-nil SearchOptions with the original query preserved.

type LLMClassifier

type LLMClassifier struct {
	// contains filtered or unexported fields
}

LLMClassifier classifies queries by asking an LLM to return structured SearchOptions.

It is the T3 tier in the ClassifierChain — called only when T0 (keyword) and T1/T2 (embedding) tiers produce no confident match. When the LLM call fails or returns unparseable JSON, LLMClassifier returns an error so the chain can fall back gracefully rather than silently returning a wrong classification.

func NewLLMClassifier

func NewLLMClassifier(client LLMClient, domains []*DomainExamples) *LLMClassifier

NewLLMClassifier creates an LLMClassifier with an LLM backend and optional domain examples.

domains may be nil or empty; when provided, a few representative examples are included in the prompt as few-shot context to improve classification accuracy.

func (*LLMClassifier) ClassifyQuery

func (c *LLMClassifier) ClassifyQuery(ctx context.Context, query string) (*ClassificationResult, error)

ClassifyQuery classifies a natural language query using an LLM and returns a ClassificationResult at Tier 3.

Returns an error if:

  • The LLM call fails (network, auth, quota)
  • The LLM response is not valid JSON
  • Context is cancelled before the call completes

type LLMClient

type LLMClient interface {
	// ClassifyQuery sends a structured classification prompt to the LLM and returns
	// the raw JSON response string.
	ClassifyQuery(ctx context.Context, prompt string) (string, error)
}

LLMClient is the minimal interface for sending a classification prompt to an LLM backend.

The returned string must be valid JSON matching llmResponse. Implementations are responsible for retries, timeouts, and backend-specific authentication.

type LLMClientAdapter

type LLMClientAdapter struct {
	// contains filtered or unexported fields
}

LLMClientAdapter adapts a graph/llm.Client to the query.LLMClient interface.

This allows the existing OpenAI-compatible LLM infrastructure (ollama, seminstruct, shimmy, OpenAI) to be used as the T3 classifier backend.

func NewLLMClientAdapter

func NewLLMClientAdapter(client llm.Client) *LLMClientAdapter

NewLLMClientAdapter wraps a graph/llm.Client for use as a query.LLMClient.

func (*LLMClientAdapter) ClassifyQuery

func (a *LLMClientAdapter) ClassifyQuery(ctx context.Context, prompt string) (string, error)

ClassifyQuery sends the classification prompt to the LLM and returns the raw response.

type PathQuery

type PathQuery struct {
	// StartEntity is the entity ID to begin traversal from
	StartEntity string `json:"start_entity"`

	// MaxDepth is the hard limit on traversal depth (number of hops)
	MaxDepth int `json:"max_depth"`

	// MaxNodes is the maximum number of nodes to visit during traversal
	MaxNodes int `json:"max_nodes"`

	// MaxTime is the timeout for query execution
	MaxTime time.Duration `json:"max_time"`

	// PredicateFilter specifies which predicates to follow (empty means all relationship predicates)
	PredicateFilter []string `json:"predicate_filter,omitempty"`

	// DecayFactor is the relevance decay with distance (0.0-1.0)
	// Score = initial_score * (DecayFactor ^ depth)
	DecayFactor float64 `json:"decay_factor"`

	// MaxPaths is the maximum number of complete paths to track (0 = unlimited)
	// This prevents exponential path growth in dense graphs
	MaxPaths int `json:"max_paths"`
}

PathQuery defines a bounded graph traversal query for edge devices

type PathResult

type PathResult struct {
	// Entities contains all entities visited during traversal
	Entities []*gtypes.EntityState `json:"entities"`

	// Paths contains sequences of entity IDs representing traversal paths
	// Each path is from StartEntity to a leaf or depth-limited entity
	Paths [][]string `json:"paths"`

	// Scores contains relevance scores for each entity (with decay applied)
	Scores map[string]float64 `json:"scores"`

	// Truncated indicates if the query hit resource limits
	Truncated bool `json:"truncated"`
}

PathResult contains the results of a bounded graph traversal

type SearchOptions

type SearchOptions struct {
	Query             string         `json:"query,omitempty"`
	GeoBounds         *SpatialBounds `json:"geo_bounds,omitempty"`
	TimeRange         *TimeRange     `json:"time_range,omitempty"`
	Predicates        []string       `json:"predicates,omitempty"`
	Types             []string       `json:"types,omitempty"`
	RequireAllFilters bool           `json:"require_all_filters,omitempty"`
	UseEmbeddings     bool           `json:"use_embeddings,omitempty"`
	Strategy          SearchStrategy `json:"strategy,omitempty"`
	Limit             int            `json:"limit,omitempty"`
	Level             int            `json:"level,omitempty"`
	MaxCommunities    int            `json:"max_communities,omitempty"`
	PathIntent        bool           `json:"path_intent,omitempty"`
	PathStartNode     string         `json:"path_start_node,omitempty"`
	PathPredicates    []string       `json:"path_predicates,omitempty"`
	// AggregationType specifies the type of aggregation (count, avg, sum, min, max).
	// Use the AggregationCount / AggregationAvg / … constants.
	AggregationType string `json:"aggregation_type,omitempty"`
	// AggregationField is the property or attribute to aggregate on (e.g. "temperature").
	// Empty means aggregate over entity count.
	AggregationField string `json:"aggregation_field,omitempty"`
	// RankingIntent is true when the query requests a ranked/top-N result set.
	RankingIntent bool `json:"ranking_intent,omitempty"`
}

SearchOptions provides declarative search configuration.

func (*SearchOptions) HasIndexFilters

func (o *SearchOptions) HasIndexFilters() bool

HasIndexFilters returns true if any index filters are specified.

func (*SearchOptions) InferStrategy

func (o *SearchOptions) InferStrategy() SearchStrategy

InferStrategy determines the best search strategy based on options.

func (*SearchOptions) SetDefaults

func (o *SearchOptions) SetDefaults()

SetDefaults applies default values for unset options.

type SearchStrategy

type SearchStrategy string

SearchStrategy represents the type of search to perform.

const (
	StrategyGraphRAG         SearchStrategy = "graphrag"
	StrategyGeoGraphRAG      SearchStrategy = "geo_graphrag"
	StrategyTemporalGraphRAG SearchStrategy = "temporal_graphrag"
	StrategyHybridGraphRAG   SearchStrategy = "hybrid_graphrag"
	StrategyPathRAG          SearchStrategy = "pathrag"
	StrategySemantic         SearchStrategy = "semantic"
	StrategyExact            SearchStrategy = "exact"
	StrategyAggregation      SearchStrategy = "aggregation"
)

Search strategy constants for query routing.

type SpatialBounds

type SpatialBounds struct {
	North float64 `json:"north"`
	South float64 `json:"south"`
	East  float64 `json:"east"`
	West  float64 `json:"west"`
}

SpatialBounds represents geographic bounding box.

type TimeRange

type TimeRange struct {
	Start time.Time `json:"start"`
	End   time.Time `json:"end"`
}

TimeRange represents temporal query bounds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL