Documentation
¶
Overview ¶
Package graphquery community cache implementation
Package graphquery implements the query coordinator component for the graph subsystem. It orchestrates queries across graph-ingest and graph-index components and provides PathRAG traversal capabilities.
Package graphquery implements the query coordinator component for the graph subsystem.
Overview ¶
The graphquery package orchestrates queries across graph subsystem components (graph-ingest, graph-index, graph-embedding, graph-clustering) and provides advanced retrieval capabilities:
- PathRAG: Graph traversal with path tracking and relevance scoring
- GraphRAG: Community-aware retrieval (local and global search)
- Static routing: Query-type-to-subject mapping for distributed components
The component handles graceful degradation when optional features (like community detection) are unavailable, enabling partial functionality during cluster startup.
Architecture ¶
┌──────────────────────────────────────────────────────────────────────────┐
│ graph-query Component │
├──────────────────────────────────────────────────────────────────────────┤
│ StaticRouter │ PathSearcher │ CommunityCache │ GraphRAG │
│ (query routing) │ (BFS traversal)│ (KV watcher) │ (search) │
└──────────────────────────────────────────────────────────────────────────┘
↓ ↓ ↓ ↓
┌────────────────┬────────────────┬────────────────┬────────────────────────┐
│ graph-ingest │ graph-index │ graph-embed │ graph-clustering │
│ (entity data) │ (relationships)│ (semantic) │ (communities) │
└────────────────┴────────────────┴────────────────┴────────────────────────┘
Usage ¶
The component is created via the factory pattern:
err := graphquery.Register(registry)
Or configured in a flow definition:
{
"type": "processor",
"name": "graph-query",
"config": {
"ports": {
"inputs": [
{"name": "query_entity", "type": "nats-request", "subject": "graph.query.entity"},
{"name": "query_relationships", "type": "nats-request", "subject": "graph.query.relationships"},
{"name": "query_path_search", "type": "nats-request", "subject": "graph.query.pathSearch"}
]
},
"query_timeout": "5s",
"max_depth": 10
}
}
Query Types ¶
PathRAG (graph.query.pathSearch):
BFS-based graph traversal with path tracking and relevance scoring:
{
"start_entity": "org.platform.domain.system.type.instance",
"max_depth": 3,
"max_nodes": 100,
"include_siblings": false
}
Response includes discovered entities, full paths from start, and decay-weighted scores.
Local Search (graph.query.localSearch):
Community-scoped search for entities related to a starting entity:
{
"entity_id": "org.platform.domain.system.type.instance",
"query": "navigation system",
"level": 0
}
Returns entities from the same community that match the query.
Global Search (graph.query.globalSearch):
Tiered search across all communities:
{
"query": "autonomous navigation",
"level": 1,
"max_communities": 5
}
Uses semantic search first (if available), then falls back to text-based scoring.
Static Routing ¶
Query types are routed to fixed NATS subjects:
Entity queries → graph.ingest.query.* Relationship queries → graph.index.query.* Semantic queries → graph.embedding.query.* Community queries → graph.clustering.query.*
Graceful Degradation ¶
The component uses resource.Watcher for optional dependencies:
- If COMMUNITY_INDEX bucket unavailable at startup, PathRAG works but GraphRAG is disabled
- When bucket becomes available later, GraphRAG is enabled automatically
- Lifecycle reporting tracks degraded states for observability
Configuration ¶
Configuration options:
QueryTimeout: 5s # Timeout for inter-component requests MaxDepth: 10 # Maximum BFS traversal depth StartupAttempts: 10 # Attempts to find optional buckets at startup StartupInterval: 500ms # Interval between startup attempts RecheckInterval: 5s # Interval for rechecking missing buckets
PathRAG Algorithm ¶
The PathSearcher uses BFS with parent tracking:
- Verify start entity exists via graph-ingest
- BFS traversal following outgoing relationships via graph-index
- Track parent info for each discovered entity
- Calculate relevance scores with exponential decay (0.8 per hop by default)
- Reconstruct full paths from start to each discovered entity
Limits prevent unbounded traversal:
- MaxDepth: stops traversal at depth limit
- MaxNodes: stops after visiting N entities
GraphRAG Search ¶
Global search uses a tiered approach:
- Tier 1: Semantic search via graph-embedding (embedding similarity)
- Tier 2: Text-based scoring of community summaries
- Load entities from top-N matching communities
- Filter entities by query terms
Local search:
- Look up entity's community from cache
- Fallback to storage query if cache miss
- Fallback to semantic search if no community
- Load and filter community members
Thread Safety ¶
The Component is safe for concurrent use. Query handlers process requests concurrently via NATS subscription workers.
Metrics ¶
The package exports Prometheus metrics:
- graph_query_duration_seconds: Query latency histogram
- graph_query_cache_hits_total: Community cache hits
- graph_query_cache_misses_total: Community cache misses
- graph_query_storage_hits_total: Storage fallback hits
- graph_query_storage_misses_total: Storage fallback misses
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: EntityState, Graphable interface
- github.com/c360studio/semstreams/graph/clustering: Community detection
- github.com/c360studio/semstreams/graph/embedding: Semantic search
- github.com/c360studio/semstreams/pkg/resource: Resource availability watching
Package graphquery entity ID resolution ¶
Package graphquery GraphRAG search handlers ¶
Package graphquery provides Prometheus metrics for graph-query component.
Package graphquery PathRAG algorithm implementation ¶
Package graphquery query handlers
Index ¶
- Constants
- func CreateGraphQuery(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type AnswerSynthesizer
- type CommunityCache
- func (c *CommunityCache) GetAllCommunities() []*clustering.Community
- func (c *CommunityCache) GetCommunitiesByLevel(level int) []*clustering.Community
- func (c *CommunityCache) GetCommunity(id string) *clustering.Community
- func (c *CommunityCache) GetEntityCommunity(entityID string, level int) *clustering.Community
- func (c *CommunityCache) IsReady() bool
- func (c *CommunityCache) Stats() CommunityStats
- func (c *CommunityCache) Stop()
- func (c *CommunityCache) WatchAndSync(ctx context.Context, bucket jetstream.KeyValue) error
- type CommunityStats
- type CommunitySummary
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type EntityDigest
- type GlobalSearchRequest
- type GlobalSearchResponse
- type HierarchyChild
- type LLMAnswerSynthesizer
- type LocalSearchRequest
- type LocalSearchResponse
- type PathEntity
- type PathRAGResult
- type PathSearchRequest
- type PathSearchResponse
- type PathSearcher
- type PathStep
- type Relationship
- type RelationshipEntry
- type SemanticHit
- type Source
- type StaticRouter
- type TemplateAnswerSynthesizer
Constants ¶
const ( // DefaultMaxCommunities is the default number of communities to search in GlobalSearch DefaultMaxCommunities = 5 // MaxTotalEntitiesInSearch limits the total number of entities that can be loaded // across all communities in GlobalSearch to prevent unbounded memory usage MaxTotalEntitiesInSearch = 10000 // MinSemanticRelevance is the minimum similarity score for semantic search results. // Hits below this threshold are filtered out to prevent returning irrelevant entities // that happen to have weak textual overlap with the query. MinSemanticRelevance = 0.5 // MinBM25Relevance is the minimum similarity score for BM25 statistical // embedding search results. Lower than MinSemanticRelevance because BM25 // feature-hashed vectors produce inherently lower cosine similarity scores // (typical range 0.0-0.73) compared to neural embeddings (0.5-1.0). MinBM25Relevance = 0.4 // MinTextRelevance is the minimum score for text-based entity matching. // Scored as proportion of query terms that match (0.0-1.0). Entities below // this threshold are excluded. Default 0.3 = at least 30% of terms must match. MinTextRelevance = 0.3 // DefaultSummarizeThreshold auto-summarizes globalSearch results when entity // count exceeds this value. Returns community summaries + entity IDs instead // of full entity triples. Set to 0 to disable and always return full entities. DefaultSummarizeThreshold = 50 // ScoreWeightSummary is the weight for summary text matches in community scoring ScoreWeightSummary = 2.0 // ScoreWeightKeyword is the weight for keyword matches in community scoring ScoreWeightKeyword = 1.5 // MaxAnswerClusters limits the number of communities included in answer synthesis MaxAnswerClusters = 5 )
GraphRAG search constants
const ( DirectionOutgoing = "outgoing" // Follow edges from entity to targets (default) DirectionIncoming = "incoming" // Follow edges from sources to entity DirectionBoth = "both" // Follow edges in both directions )
Direction constants for path traversal
const DefaultAnswerSynthesisTimeout = 15 * time.Second
DefaultAnswerSynthesisTimeout caps a single answer-synthesis LLM call when no operator-configured timeout is supplied. Sized to leave substantial budget for the rest of the response path under typical graph-gateway request deadlines (30-60s); slow upstream models that need longer should configure capability.timeout or endpoint.request_timeout explicitly. The bounded sub-timeout is what makes the template-fallback path transparent to the HTTP layer — without it, a slow LLM can eat the entire request ctx and the fallback runs after the gateway has already returned an error to the HTTP client.
const MinSemanticRelevancePure = 0.5
MinSemanticRelevancePure is the minimum similarity score for pure semantic strategy queries (e.g. "find similar to X"). Higher than MinSemanticRelevance because pure semantic queries should only return genuinely similar entities.
Variables ¶
This section is empty.
Functions ¶
func CreateGraphQuery ¶
func CreateGraphQuery(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateGraphQuery creates a new graph query coordinator component
Types ¶
type AnswerSynthesizer ¶
type AnswerSynthesizer interface {
// Synthesize produces an answer to the query based on community summaries.
// Returns the answer text and the model name used (empty for template fallback).
Synthesize(ctx context.Context, query string, summaries []CommunitySummary, totalEntities int) (answer string, model string, err error)
// Close releases any resources held by the synthesizer.
Close() error
}
AnswerSynthesizer produces a natural language answer from community summaries in response to a globalSearch query.
type CommunityCache ¶
type CommunityCache struct {
// contains filtered or unexported fields
}
CommunityCache maintains an in-memory cache of communities from COMMUNITY_INDEX KV. It watches the KV bucket for changes and updates the cache in real-time. This is a consumer-owned cache - graph-query owns and manages its own view of community data.
func NewCommunityCache ¶
func NewCommunityCache(logger *slog.Logger) *CommunityCache
NewCommunityCache creates a new community cache.
func (*CommunityCache) GetAllCommunities ¶
func (c *CommunityCache) GetAllCommunities() []*clustering.Community
GetAllCommunities retrieves all communities regardless of level. Returns empty slice if no communities exist.
func (*CommunityCache) GetCommunitiesByLevel ¶
func (c *CommunityCache) GetCommunitiesByLevel(level int) []*clustering.Community
GetCommunitiesByLevel retrieves all communities at a specific level. Returns empty slice if no communities exist at that level.
func (*CommunityCache) GetCommunity ¶
func (c *CommunityCache) GetCommunity(id string) *clustering.Community
GetCommunity retrieves a community by ID. Returns nil if not found.
func (*CommunityCache) GetEntityCommunity ¶
func (c *CommunityCache) GetEntityCommunity(entityID string, level int) *clustering.Community
GetEntityCommunity retrieves the community containing an entity at a specific level. Returns nil if the entity is not in any community at that level.
func (*CommunityCache) IsReady ¶
func (c *CommunityCache) IsReady() bool
IsReady returns true if the initial sync from KV is complete.
func (*CommunityCache) Stats ¶
func (c *CommunityCache) Stats() CommunityStats
Stats returns cache statistics.
func (*CommunityCache) WatchAndSync ¶
WatchAndSync starts watching the COMMUNITY_INDEX KV bucket and syncs changes to the cache. This method blocks until the context is cancelled.
type CommunityStats ¶
type CommunityStats struct {
TotalCommunities int `json:"total_communities"`
TotalEntities int `json:"total_entities"`
ByLevel map[int]int `json:"by_level"`
Ready bool `json:"ready"`
}
CommunityStats provides cache statistics.
type CommunitySummary ¶
type CommunitySummary struct {
CommunityID string `json:"community_id"`
Summary string `json:"summary"`
Keywords []string `json:"keywords"`
Level int `json:"level"`
Relevance float64 `json:"relevance"`
MemberCount int `json:"member_count,omitempty"` // total entities in community
Entities []EntityDigest `json:"entities,omitempty"` // representative entity digests
}
CommunitySummary represents a community's summary used in global search
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the graph query coordinator
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the JSON schema for the component's configuration
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns the component's data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the component's health status
func (*Component) Initialize ¶
Initialize initializes the component
func (*Component) InputPorts ¶
InputPorts returns the component's input ports
func (*Component) OutputPorts ¶
OutputPorts returns the component's output ports (none for query coordinator)
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration,category:basic"`
QueryTimeout time.Duration `json:"query_timeout,omitempty" schema:"type:duration,description:Timeout for query operations,default:5s,category:basic"`
MaxDepth int `` /* 136-byte string literal not displayed */
StartupAttempts int `` /* 141-byte string literal not displayed */
StartupInterval time.Duration `` /* 134-byte string literal not displayed */
RecheckInterval time.Duration `` /* 137-byte string literal not displayed */
MinSemanticRelevance float64 `` /* 165-byte string literal not displayed */
MinBM25Relevance float64 `` /* 159-byte string literal not displayed */
MinTextRelevance float64 `` /* 144-byte string literal not displayed */
}
Config defines the configuration for the graph-query coordinator component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default configuration for the graph-query coordinator
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults applies default values to the configuration
func (Config) Schema ¶
func (c Config) Schema() component.ConfigSchema
Schema returns the configuration schema for the component
type EntityDigest ¶
type EntityDigest struct {
ID string `json:"id"`
Type string `json:"type"` // extracted from 5th segment of entity ID
Label string `json:"label,omitempty"` // human-readable name from key predicates
Relevance float64 `json:"relevance,omitempty"` // semantic similarity score when available
}
EntityDigest provides lightweight, agent-readable context for an entity without requiring a full EntityState load.
type GlobalSearchRequest ¶
type GlobalSearchRequest struct {
Query string `json:"query"`
Level int `json:"level"`
MaxCommunities int `json:"max_communities"`
SummarizeThreshold *int `json:"summarize_threshold,omitempty"` // Auto-summarize when results exceed this (default: 50, -1=disabled)
IncludeSummaries *bool `json:"include_summaries,omitempty"` // Include community summaries (default: true)
IncludeRelationships bool `json:"include_relationships,omitempty"` // Include relationships between entities (default: false)
IncludeSources bool `json:"include_sources,omitempty"` // Include source attribution (default: false)
}
GlobalSearchRequest is the request format for global search
type GlobalSearchResponse ¶
type GlobalSearchResponse struct {
Strategy string `json:"strategy,omitempty"` // which strategy handled this query
Entities []*gtypes.EntityState `json:"entities"`
EntityIDs []string `json:"entity_ids,omitempty"` // IDs only (when summarized)
EntityDigests []EntityDigest `json:"entity_digests,omitempty"` // lightweight entity context
Summarized bool `json:"summarized,omitempty"` // true when auto-summarized
CommunitySummaries []CommunitySummary `json:"community_summaries,omitempty"`
Relationships []Relationship `json:"relationships,omitempty"`
Sources []Source `json:"sources,omitempty"`
Count int `json:"count"`
DurationMs int64 `json:"duration_ms"`
Answer string `json:"answer,omitempty"`
AnswerModel string `json:"answer_model,omitempty"`
}
GlobalSearchResponse is the response format for global search
type HierarchyChild ¶
type HierarchyChild struct {
Prefix string `json:"prefix"`
Name string `json:"name"`
Count int `json:"count"`
}
HierarchyChild represents a child node in the hierarchy
type LLMAnswerSynthesizer ¶
type LLMAnswerSynthesizer struct {
// contains filtered or unexported fields
}
LLMAnswerSynthesizer uses an LLM to produce query-focused answers from community summaries. Falls back to template synthesis on LLM error or timeout — see Synthesize for the bounded-timeout contract.
func NewLLMAnswerSynthesizer ¶
func NewLLMAnswerSynthesizer(client llm.Client, modelName string, logger *slog.Logger, timeout time.Duration) *LLMAnswerSynthesizer
NewLLMAnswerSynthesizer creates an LLM-backed answer synthesizer with a bounded per-call LLM timeout. timeout=0 selects DefaultAnswerSynthesisTimeout (15s). Operators configure the value via capability.timeout (preferred, applies to every endpoint that handles the capability) or endpoint.request_timeout (applies only to the configured endpoint); see component.initAnswerSynthesizer for the resolution order.
func (*LLMAnswerSynthesizer) Close ¶
func (s *LLMAnswerSynthesizer) Close() error
Close releases the LLM client resources.
func (*LLMAnswerSynthesizer) Synthesize ¶
func (s *LLMAnswerSynthesizer) Synthesize(ctx context.Context, query string, summaries []CommunitySummary, totalEntities int) (string, string, error)
Synthesize produces a query-focused answer by sending community summaries to the LLM. On LLM failure OR sub-timeout, falls back to template synthesis and logs the error internally — the fallback path is transparent to the HTTP layer and never propagates an error.
The LLM call runs under a sub-context bounded by s.timeout, NOT under the parent ctx directly. This is the load-bearing detail: a slow upstream model (semspec hit this with seminstruct as answer_synthesis) can otherwise consume the entire gateway HTTP request budget; the fallback then runs after the gateway has already returned an error to the client. Bounding here leaves margin for the rest of the response path (response marshalling, NATS reply) so the template fallback actually reaches the HTTP layer.
If the parent ctx has less budget than s.timeout, the parent's deadline wins (context.WithTimeout uses the earlier of the two). If the parent ctx is already cancelled, the LLM call returns immediately with the inherited error and we still fall back to template.
type LocalSearchRequest ¶
type LocalSearchRequest struct {
EntityID string `json:"entity_id"`
Query string `json:"query"`
Level int `json:"level"`
}
LocalSearchRequest is the request format for local search
type LocalSearchResponse ¶
type LocalSearchResponse struct {
Entities []*gtypes.EntityState `json:"entities"`
CommunityID string `json:"communityId"`
Count int `json:"count"`
DurationMs int64 `json:"durationMs"`
CommunitySummary string `json:"community_summary,omitempty"`
Keywords []string `json:"keywords,omitempty"`
MemberCount int `json:"member_count,omitempty"`
EntityDigests []EntityDigest `json:"entity_digests,omitempty"`
Answer string `json:"answer,omitempty"`
AnswerModel string `json:"answer_model,omitempty"`
}
LocalSearchResponse is the response format for local search
type PathEntity ¶
type PathEntity struct {
ID string `json:"id"`
Type string `json:"type"`
Score float64 `json:"score"`
}
PathEntity represents a discovered entity with relevance score
type PathRAGResult ¶
type PathRAGResult struct {
Entities []*gtypes.EntityState
Truncated bool
}
PathRAGResult wraps the result of a PathRAG query for use in handleGlobalSearch. It contains EntityState objects rather than PathEntity for consistency with GlobalSearchResponse.
type PathSearchRequest ¶
type PathSearchRequest struct {
StartEntity string `json:"start_entity"`
MaxDepth int `json:"max_depth"`
MaxNodes int `json:"max_nodes"`
IncludeSiblings bool `json:"include_siblings"`
Direction string `json:"direction,omitempty"` // "outgoing" (default), "incoming", "both"
Predicates []string `json:"predicates,omitempty"` // Filter to specific predicates (empty = all)
Timeout string `json:"timeout,omitempty"` // Request timeout e.g. "5s" (0 = default)
MaxPaths int `json:"max_paths,omitempty"` // Limit number of paths returned (0 = unlimited)
}
PathSearchRequest defines the request schema for path search queries
type PathSearchResponse ¶
type PathSearchResponse struct {
Entities []PathEntity `json:"entities"`
Paths [][]PathStep `json:"paths"` // Each path is a sequence of steps from start to entity
Truncated bool `json:"truncated"`
}
PathSearchResponse defines the response for path search
type PathSearcher ¶
type PathSearcher struct {
// contains filtered or unexported fields
}
PathSearcher executes PathRAG traversal with proper path tracking
func NewPathSearcher ¶
func NewPathSearcher(nats natsRequester, timeout time.Duration, maxDepth int, logger *slog.Logger) *PathSearcher
NewPathSearcher creates a new PathSearcher instance
func (*PathSearcher) Search ¶
func (p *PathSearcher) Search(ctx context.Context, req PathSearchRequest) (*PathSearchResponse, error)
Search performs BFS traversal with path tracking
type PathStep ¶
type PathStep struct {
From string `json:"from"`
Predicate string `json:"predicate"`
To string `json:"to"`
}
PathStep represents a single edge traversal
type Relationship ¶
type Relationship struct {
FromEntityID string `json:"from_entity_id"`
ToEntityID string `json:"to_entity_id"`
Predicate string `json:"predicate"`
}
Relationship represents a relationship between two entities in search results
type RelationshipEntry ¶
type RelationshipEntry struct {
ToEntityID string `json:"to_entity_id"`
Predicate string `json:"predicate"`
}
RelationshipEntry represents an outgoing relationship from graph-index
type SemanticHit ¶
SemanticHit represents a search result with semantic similarity score
type Source ¶
type Source struct {
EntityID string `json:"entity_id"`
CommunityID string `json:"community_id,omitempty"`
Relevance float64 `json:"relevance"`
}
Source represents source attribution for search results
type StaticRouter ¶
type StaticRouter struct {
// contains filtered or unexported fields
}
StaticRouter routes queries to known graph component subjects. Routing is based on query type string, not runtime discovery.
func NewStaticRouter ¶
func NewStaticRouter(logger *slog.Logger) *StaticRouter
NewStaticRouter creates a router with static routes for all known query types.
func (*StaticRouter) Route ¶
func (r *StaticRouter) Route(queryType string) string
Route returns the NATS subject for a given query type. Returns empty string if the query type is unknown or receiver is nil.
type TemplateAnswerSynthesizer ¶
type TemplateAnswerSynthesizer struct{}
TemplateAnswerSynthesizer produces answers from community summaries using string templates. No LLM call required — used as fallback when no answer_synthesis endpoint is configured.
func (*TemplateAnswerSynthesizer) Close ¶
func (s *TemplateAnswerSynthesizer) Close() error
Close is a no-op for the template synthesizer.
func (*TemplateAnswerSynthesizer) Synthesize ¶
func (s *TemplateAnswerSynthesizer) Synthesize(_ context.Context, _ string, summaries []CommunitySummary, totalEntities int) (string, string, error)
Synthesize produces a template-based answer.