Documentation
¶
Overview ¶
Package query provides a clean interface for reading graph data from NATS KV buckets.
Package query provides a clean interface for reading graph data from NATS KV buckets.
Overview ¶
The query package extracts read operations from the graph processor, enabling reusable graph queries with caching. It provides entity retrieval, relationship traversal, spatial queries, and bounded path queries suitable for resource-constrained edge devices.
All queries read from NATS KV buckets (ENTITY_STATES, SPATIAL_INDEX, INCOMING_INDEX) with an integrated cache layer for performance. The client manages bucket initialization lazily on first use.
Architecture ¶
┌─────────────────────────────────────────────────────────────────┐
│ Query Client │
│ - Entity caching (hybrid LRU+TTL) │
│ - Lazy bucket initialization │
│ - Bounded path traversal │
└─────────────────────────────────────────────────────────────────┘
↓
┌────────────────────┬────────────────────┬───────────────────────┐
│ ENTITY_STATES │ SPATIAL_INDEX │ INCOMING_INDEX │
│ (entity data) │ (geohash→IDs) │ (target→sources) │
└────────────────────┴────────────────────┴───────────────────────┘
Usage ¶
Create a query client:
config := query.DefaultConfig() client, err := query.NewClient(ctx, natsClient, config)
Basic entity operations:
// Get single entity
entity, err := client.GetEntity(ctx, "org.platform.domain.system.type.instance")
// Get multiple entities
entities, err := client.GetEntitiesBatch(ctx, []string{id1, id2, id3})
// Get entities by type (extracted from ID)
drones, err := client.GetEntitiesByType(ctx, "drone")
Relationship queries:
// Get outgoing relationships by predicate targets, err := client.GetOutgoingRelationships(ctx, entityID, "controls") // Get incoming relationships (from INCOMING_INDEX) sources, err := client.GetIncomingRelationships(ctx, entityID) // Get all connected entities (both directions) connected, err := client.GetEntityConnections(ctx, entityID) // Verify specific relationship exists exists, err := client.VerifyRelationship(ctx, fromID, toID, "controls")
Bounded path traversal for edge devices:
result, err := client.ExecutePathQuery(ctx, query.PathQuery{
StartEntity: "org.platform.domain.system.drone.001",
MaxDepth: 3, // Maximum hops
MaxNodes: 100, // Maximum entities to visit
MaxTime: 5*time.Second,
PredicateFilter: []string{"controls", "monitors"}, // Empty = all
DecayFactor: 0.8, // Relevance decay per hop
MaxPaths: 50, // Limit path explosion
})
Spatial queries:
// Get entities in geohash region entities, err := client.GetEntitiesInRegion(ctx, "9q8yy")
Path Query ¶
The PathQuery performs bounded graph traversal with configurable limits:
Parameters:
- StartEntity: Entity ID to begin traversal from
- MaxDepth: Maximum hop count (prevents infinite recursion)
- MaxNodes: Maximum entities to visit (memory bound)
- MaxTime: Query timeout (prevents runaway queries)
- PredicateFilter: Relationship predicates to follow (empty = all)
- DecayFactor: Score decay per hop (0.0-1.0)
- MaxPaths: Maximum complete paths to track (0 = unlimited)
The result includes:
- Entities: All visited entity states
- Paths: Complete paths from start to leaves
- Scores: Relevance scores with decay applied
- Truncated: Whether query hit resource limits
Caching ¶
The client uses a hybrid cache (LRU + TTL) for entity states:
EntityCache:
Strategy: "hybrid" # LRU eviction with TTL expiry
MaxSize: 1000 # Maximum cached entities
TTL: 5m # Time-to-live
CleanupInterval: 1m # Background cleanup interval
Cache statistics are available via GetCacheStats().
Configuration ¶
Client configuration:
EntityCache: # See cache.Config
EntityStates:
TTL: 24h # KV bucket TTL
History: 3 # Version history
Replicas: 1 # Replication factor
SpatialIndex:
TTL: 1h # Spatial data TTL
History: 1
Replicas: 1
IncomingIndex:
TTL: 24h # Incoming edge TTL
History: 1
Replicas: 1
Thread Safety ¶
The Client is safe for concurrent use. Bucket initialization uses mutex protection, and cache operations are atomic.
Metrics ¶
When created with NewClientWithMetrics, the client exports cache metrics under the "query_client" namespace.
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: EntityState type
- github.com/c360studio/semstreams/graph/datamanager: Entity persistence
- github.com/c360studio/semstreams/pkg/cache: Cache implementation
- github.com/c360studio/semstreams/natsclient: NATS KV operations
Package query provides a clean interface for reading graph data from NATS KV buckets. This package extracts query operations from GraphProcessor to enable reusable graph queries.
Index ¶
- Constants
- type CacheStats
- type ClassificationResult
- type Classifier
- type ClassifierChain
- type Client
- type Config
- type DomainExamples
- type Embedder
- type EmbeddingClassifier
- type Example
- type KeywordClassifier
- type LLMClassifier
- type LLMClient
- type LLMClientAdapter
- type PathQuery
- type PathResult
- type SearchOptions
- type SearchStrategy
- type SpatialBounds
- type TimeRange
Constants ¶
const ( AggregationCount = "count" AggregationAvg = "avg" AggregationSum = "sum" AggregationMin = "min" AggregationMax = "max" )
Aggregation type constants for aggregation queries.
const DefaultMaxCommunities = 5
DefaultMaxCommunities is the default number of communities to search.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheStats ¶
type CacheStats struct {
Hits int64 `json:"hits"`
Misses int64 `json:"misses"`
Size int `json:"size"`
HitRate float64 `json:"hit_rate"`
LastCleared time.Time `json:"last_cleared"`
}
CacheStats provides statistics about query cache performance
type ClassificationResult ¶
type ClassificationResult struct {
Tier int // Which tier produced the result (0=keyword, 1=BM25, 2=neural)
Intent string // Classified intent (from embedding match, empty for keyword)
Options map[string]any // SearchOptions hints (from keyword or embedding example)
Confidence float64 // Confidence score (1.0 for keywords, similarity for embedding)
}
ClassificationResult contains the classification output from the tiered classifier chain.
type Classifier ¶
type Classifier interface {
// ClassifyQuery analyzes a query string and returns SearchOptions.
ClassifyQuery(ctx context.Context, query string) *SearchOptions
}
Classifier analyzes natural language queries to extract search intent.
type ClassifierChain ¶
type ClassifierChain struct {
// contains filtered or unexported fields
}
ClassifierChain orchestrates tiered query classification.
Routing logic:
- T0 (keyword): Always try first - keyword patterns bypass embedding
- T1/T2 (embedding): Only if no keyword match AND embedding != nil
- Default: Return T0 result with empty options if no tier matches
Thread-safe for concurrent ClassifyQuery calls.
func NewClassifierChain ¶
func NewClassifierChain(keyword *KeywordClassifier, embedding *EmbeddingClassifier, llmClassifiers ...*LLMClassifier) *ClassifierChain
NewClassifierChain creates a classifier chain with keyword and optional embedding/LLM classifiers.
All parameters may be nil. Chain will route queries through available tiers.
func (*ClassifierChain) ClassifyQuery ¶
func (c *ClassifierChain) ClassifyQuery(ctx context.Context, query string) *ClassificationResult
ClassifyQuery classifies a query through the tier chain.
Returns result from first tier that produces a match:
- T0: Keyword patterns (temporal, spatial, similarity, path, zone)
- T1/T2: Embedding similarity (if available and no keyword match)
- Default: T0 result with no filters if no tier matches
Returns nil if chain is nil or context cancelled.
type Client ¶
type Client interface {
// Basic entity operations
GetEntity(ctx context.Context, entityID string) (*gtypes.EntityState, error)
GetEntitiesByType(ctx context.Context, entityType string) ([]*gtypes.EntityState, error)
GetEntitiesBatch(ctx context.Context, entityIDs []string) ([]*gtypes.EntityState, error)
// Entity listing and counting
ListEntities(ctx context.Context) ([]string, error)
CountEntities(ctx context.Context) (int, error)
// Graph traversal and path queries
ExecutePathQuery(ctx context.Context, query PathQuery) (*PathResult, error)
// Relationship queries (using triples)
GetIncomingRelationships(ctx context.Context, entityID string) ([]string, error)
GetOutgoingRelationships(ctx context.Context, entityID string, predicate string) ([]string, error)
GetEntityConnections(ctx context.Context, entityID string) ([]*gtypes.EntityState, error)
VerifyRelationship(ctx context.Context, fromID, toID, predicate string) (bool, error)
CountIncomingRelationships(ctx context.Context, entityID string) (int, error)
// Search and filtering
QueryEntities(ctx context.Context, criteria map[string]any) ([]*gtypes.EntityState, error)
// Spatial queries
GetEntitiesInRegion(ctx context.Context, geohash string) ([]*gtypes.EntityState, error)
// Predicate queries
GetEntitiesByPredicate(ctx context.Context, predicate string) ([]string, error)
ListPredicates(ctx context.Context) ([]gtypes.PredicateSummary, error)
GetPredicateStats(ctx context.Context, predicate string, sampleLimit int) (*gtypes.PredicateStatsData, error)
QueryCompoundPredicates(ctx context.Context, query gtypes.CompoundPredicateQuery) ([]string, error)
// Cache and metrics
GetCacheStats() CacheStats
Clear() error
Close() error
}
Client defines the interface for querying graph data. All methods read from NATS KV buckets and support caching for performance.
func NewClientWithMetrics ¶
func NewClientWithMetrics( ctx context.Context, nc *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry, ) (Client, error)
NewClientWithMetrics creates a new query client with the given NATS client, configuration, and optional metrics
type Config ¶
type Config struct {
// EntityCache configuration
EntityCache cache.Config `json:"entity_cache"`
// KV Bucket configurations (should match GraphProcessor config)
EntityStates struct {
TTL time.Duration `json:"ttl"`
History uint8 `json:"history"`
Replicas int `json:"replicas"`
} `json:"entity_states"`
SpatialIndex struct {
TTL time.Duration `json:"ttl"`
History uint8 `json:"history"`
Replicas int `json:"replicas"`
} `json:"spatial_index"`
IncomingIndex struct {
TTL time.Duration `json:"ttl"`
History uint8 `json:"history"`
Replicas int `json:"replicas"`
} `json:"incoming_index"`
}
Config contains configuration for the Client
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a sensible default configuration
type DomainExamples ¶
type DomainExamples struct {
Domain string `json:"domain"`
Version string `json:"version"`
Examples []Example `json:"examples"`
}
DomainExamples represents a collection of examples for a domain.
func LoadAllDomainExamples ¶
func LoadAllDomainExamples(filePaths []string) ([]*DomainExamples, error)
LoadAllDomainExamples loads and aggregates multiple domain example files.
func LoadDomainExamples ¶
func LoadDomainExamples(filePath string) (*DomainExamples, error)
LoadDomainExamples loads query examples from a JSON file.
type Embedder ¶
type Embedder interface {
// Embed generates a vector for a single text.
Embed(text string) ([]float32, error)
// EmbedBatch generates vectors for multiple texts.
EmbedBatch(texts []string) ([][]float32, error)
}
Embedder interface for vector generation.
This is a simplified interface for the query classifier that supports both single and batch embedding operations.
type EmbeddingClassifier ¶
type EmbeddingClassifier struct {
// contains filtered or unexported fields
}
EmbeddingClassifier classifies queries by finding similar domain examples.
The classifier starts with BM25 vectors (warm cache - no external service needed) and can be upgraded to neural vectors later when embeddings are available.
Thread-safe for concurrent FindBestMatch calls during vector upgrades.
func NewEmbeddingClassifier ¶
func NewEmbeddingClassifier(domains []*DomainExamples, threshold float64) *EmbeddingClassifier
NewEmbeddingClassifier creates classifier with BM25 warm cache.
Generates BM25 vectors for all examples immediately (no external service needed). Returns a classifier ready for statistical similarity matching.
func (*EmbeddingClassifier) FindBestMatch ¶
FindBestMatch finds the most similar example to the query.
Returns nil if no match above threshold or context cancelled. Thread-safe - uses read lock to allow concurrent calls during vector upgrades.
func (*EmbeddingClassifier) Threshold ¶
func (c *EmbeddingClassifier) Threshold() float64
Threshold returns the similarity threshold.
func (*EmbeddingClassifier) UpgradeVectors ¶
func (c *EmbeddingClassifier) UpgradeVectors(embedder Embedder) error
UpgradeVectors replaces current vectors with neural vectors from new embedder.
Thread-safe - uses write lock to prevent concurrent reads during upgrade. On error, preserves old vectors (rollback).
type Example ¶
type Example struct {
Query string `json:"query"` // Natural language query
Intent string `json:"intent"` // Intent category
Options map[string]any `json:"options"` // SearchOptions hints (optional)
Vector []float32 `json:"-"` // Runtime field - embedding vector (not serialized)
}
Example represents a single domain query example for intent classification.
type KeywordClassifier ¶
type KeywordClassifier struct{}
KeywordClassifier implements regex-based natural language query classification. Uses pattern matching to extract temporal, spatial, and intent information.
func NewKeywordClassifier ¶
func NewKeywordClassifier() *KeywordClassifier
NewKeywordClassifier creates a new keyword-based classifier.
func (*KeywordClassifier) ClassifyQuery ¶
func (k *KeywordClassifier) ClassifyQuery(_ context.Context, query string) *SearchOptions
ClassifyQuery analyzes a natural language query and populates SearchOptions. Detects temporal, spatial, similarity, path, and aggregation intents. Always returns non-nil SearchOptions with the original query preserved.
type LLMClassifier ¶
type LLMClassifier struct {
// contains filtered or unexported fields
}
LLMClassifier classifies queries by asking an LLM to return structured SearchOptions.
It is the T3 tier in the ClassifierChain — called only when T0 (keyword) and T1/T2 (embedding) tiers produce no confident match. When the LLM call fails or returns unparseable JSON, LLMClassifier returns an error so the chain can fall back gracefully rather than silently returning a wrong classification.
func NewLLMClassifier ¶
func NewLLMClassifier(client LLMClient, domains []*DomainExamples) *LLMClassifier
NewLLMClassifier creates an LLMClassifier with an LLM backend and optional domain examples.
domains may be nil or empty; when provided, a few representative examples are included in the prompt as few-shot context to improve classification accuracy.
func (*LLMClassifier) ClassifyQuery ¶
func (c *LLMClassifier) ClassifyQuery(ctx context.Context, query string) (*ClassificationResult, error)
ClassifyQuery classifies a natural language query using an LLM and returns a ClassificationResult at Tier 3.
Returns an error if:
- The LLM call fails (network, auth, quota)
- The LLM response is not valid JSON
- Context is cancelled before the call completes
type LLMClient ¶
type LLMClient interface {
// ClassifyQuery sends a structured classification prompt to the LLM and returns
// the raw JSON response string.
ClassifyQuery(ctx context.Context, prompt string) (string, error)
}
LLMClient is the minimal interface for sending a classification prompt to an LLM backend.
The returned string must be valid JSON matching llmResponse. Implementations are responsible for retries, timeouts, and backend-specific authentication.
type LLMClientAdapter ¶
type LLMClientAdapter struct {
// contains filtered or unexported fields
}
LLMClientAdapter adapts a graph/llm.Client to the query.LLMClient interface.
This allows the existing OpenAI-compatible LLM infrastructure (ollama, seminstruct, shimmy, OpenAI) to be used as the T3 classifier backend.
func NewLLMClientAdapter ¶
func NewLLMClientAdapter(client llm.Client) *LLMClientAdapter
NewLLMClientAdapter wraps a graph/llm.Client for use as a query.LLMClient.
func (*LLMClientAdapter) ClassifyQuery ¶
ClassifyQuery sends the classification prompt to the LLM and returns the raw response.
type PathQuery ¶
type PathQuery struct {
// StartEntity is the entity ID to begin traversal from
StartEntity string `json:"start_entity"`
// MaxDepth is the hard limit on traversal depth (number of hops)
MaxDepth int `json:"max_depth"`
// MaxNodes is the maximum number of nodes to visit during traversal
MaxNodes int `json:"max_nodes"`
// MaxTime is the timeout for query execution
MaxTime time.Duration `json:"max_time"`
// PredicateFilter specifies which predicates to follow (empty means all relationship predicates)
PredicateFilter []string `json:"predicate_filter,omitempty"`
// DecayFactor is the relevance decay with distance (0.0-1.0)
// Score = initial_score * (DecayFactor ^ depth)
DecayFactor float64 `json:"decay_factor"`
// MaxPaths is the maximum number of complete paths to track (0 = unlimited)
// This prevents exponential path growth in dense graphs
MaxPaths int `json:"max_paths"`
}
PathQuery defines a bounded graph traversal query for edge devices
type PathResult ¶
type PathResult struct {
// Entities contains all entities visited during traversal
Entities []*gtypes.EntityState `json:"entities"`
// Paths contains sequences of entity IDs representing traversal paths
// Each path is from StartEntity to a leaf or depth-limited entity
Paths [][]string `json:"paths"`
// Scores contains relevance scores for each entity (with decay applied)
Scores map[string]float64 `json:"scores"`
// Truncated indicates if the query hit resource limits
Truncated bool `json:"truncated"`
}
PathResult contains the results of a bounded graph traversal
type SearchOptions ¶
type SearchOptions struct {
Query string `json:"query,omitempty"`
GeoBounds *SpatialBounds `json:"geo_bounds,omitempty"`
TimeRange *TimeRange `json:"time_range,omitempty"`
Predicates []string `json:"predicates,omitempty"`
Types []string `json:"types,omitempty"`
RequireAllFilters bool `json:"require_all_filters,omitempty"`
UseEmbeddings bool `json:"use_embeddings,omitempty"`
Strategy SearchStrategy `json:"strategy,omitempty"`
Limit int `json:"limit,omitempty"`
Level int `json:"level,omitempty"`
MaxCommunities int `json:"max_communities,omitempty"`
PathIntent bool `json:"path_intent,omitempty"`
PathStartNode string `json:"path_start_node,omitempty"`
PathPredicates []string `json:"path_predicates,omitempty"`
// AggregationType specifies the type of aggregation (count, avg, sum, min, max).
// Use the AggregationCount / AggregationAvg / … constants.
AggregationType string `json:"aggregation_type,omitempty"`
// AggregationField is the property or attribute to aggregate on (e.g. "temperature").
// Empty means aggregate over entity count.
AggregationField string `json:"aggregation_field,omitempty"`
// RankingIntent is true when the query requests a ranked/top-N result set.
RankingIntent bool `json:"ranking_intent,omitempty"`
}
SearchOptions provides declarative search configuration.
func (*SearchOptions) HasIndexFilters ¶
func (o *SearchOptions) HasIndexFilters() bool
HasIndexFilters returns true if any index filters are specified.
func (*SearchOptions) InferStrategy ¶
func (o *SearchOptions) InferStrategy() SearchStrategy
InferStrategy determines the best search strategy based on options.
func (*SearchOptions) SetDefaults ¶
func (o *SearchOptions) SetDefaults()
SetDefaults applies default values for unset options.
type SearchStrategy ¶
type SearchStrategy string
SearchStrategy represents the type of search to perform.
const ( StrategyGraphRAG SearchStrategy = "graphrag" StrategyGeoGraphRAG SearchStrategy = "geo_graphrag" StrategyTemporalGraphRAG SearchStrategy = "temporal_graphrag" StrategyHybridGraphRAG SearchStrategy = "hybrid_graphrag" StrategyPathRAG SearchStrategy = "pathrag" StrategySemantic SearchStrategy = "semantic" StrategyExact SearchStrategy = "exact" StrategyAggregation SearchStrategy = "aggregation" )
Search strategy constants for query routing.