Documentation
¶
Overview ¶
Package graphquery community cache implementation
Package graphquery implements the query coordinator component for the graph subsystem. It orchestrates queries across graph-ingest and graph-index components and provides PathRAG traversal capabilities.
Package graphquery implements the query coordinator component for the graph subsystem.
Overview ¶
The graphquery package orchestrates queries across graph subsystem components (graph-ingest, graph-index, graph-embedding, graph-clustering) and provides advanced retrieval capabilities:
- PathRAG: Graph traversal with path tracking and relevance scoring
- GraphRAG: Community-aware retrieval (local and global search)
- Static routing: Query-type-to-subject mapping for distributed components
The component handles graceful degradation when optional features (like community detection) are unavailable, enabling partial functionality during cluster startup.
Architecture ¶
┌──────────────────────────────────────────────────────────────────────────┐
│ graph-query Component │
├──────────────────────────────────────────────────────────────────────────┤
│ StaticRouter │ PathSearcher │ CommunityCache │ GraphRAG │
│ (query routing) │ (BFS traversal)│ (KV watcher) │ (search) │
└──────────────────────────────────────────────────────────────────────────┘
↓ ↓ ↓ ↓
┌────────────────┬────────────────┬────────────────┬────────────────────────┐
│ graph-ingest │ graph-index │ graph-embed │ graph-clustering │
│ (entity data) │ (relationships)│ (semantic) │ (communities) │
└────────────────┴────────────────┴────────────────┴────────────────────────┘
Usage ¶
The component is created via the factory pattern:
err := graphquery.Register(registry)
Or configured in a flow definition:
{
"type": "processor",
"name": "graph-query",
"config": {
"ports": {
"inputs": [
{"name": "query_entity", "type": "nats-request", "subject": "graph.query.entity"},
{"name": "query_relationships", "type": "nats-request", "subject": "graph.query.relationships"},
{"name": "query_path_search", "type": "nats-request", "subject": "graph.query.pathSearch"}
]
},
"query_timeout": "5s",
"max_depth": 10
}
}
Query Types ¶
PathRAG (graph.query.pathSearch):
BFS-based graph traversal with path tracking and relevance scoring:
{
"start_entity": "org.platform.domain.system.type.instance",
"max_depth": 3,
"max_nodes": 100,
"include_siblings": false
}
Response includes discovered entities, full paths from start, and decay-weighted scores.
Local Search (graph.query.localSearch):
Community-scoped search for entities related to a starting entity:
{
"entity_id": "org.platform.domain.system.type.instance",
"query": "navigation system",
"level": 0
}
Returns entities from the same community that match the query.
Global Search (graph.query.globalSearch):
Tiered search across all communities:
{
"query": "autonomous navigation",
"level": 1,
"max_communities": 5
}
Uses semantic search first (if available), then falls back to text-based scoring.
Static Routing ¶
Query types are routed to fixed NATS subjects:
Entity queries → graph.ingest.query.* Relationship queries → graph.index.query.* Semantic queries → graph.embedding.query.* Community queries → graph.clustering.query.*
Graceful Degradation ¶
The component uses resource.Watcher for optional dependencies:
- If COMMUNITY_INDEX bucket unavailable at startup, PathRAG works but GraphRAG is disabled
- When bucket becomes available later, GraphRAG is enabled automatically
- Lifecycle reporting tracks degraded states for observability
Configuration ¶
Configuration options:
QueryTimeout: 5s # Timeout for inter-component requests MaxDepth: 10 # Maximum BFS traversal depth StartupAttempts: 10 # Attempts to find optional buckets at startup StartupInterval: 500ms # Interval between startup attempts RecheckInterval: 5s # Interval for rechecking missing buckets
PathRAG Algorithm ¶
The PathSearcher uses BFS with parent tracking:
- Verify start entity exists via graph-ingest
- BFS traversal following outgoing relationships via graph-index
- Track parent info for each discovered entity
- Calculate relevance scores with exponential decay (0.8 per hop by default)
- Reconstruct full paths from start to each discovered entity
Limits prevent unbounded traversal:
- MaxDepth: stops traversal at depth limit
- MaxNodes: stops after visiting N entities
GraphRAG Search ¶
Global search uses a tiered approach:
- Tier 1: Semantic search via graph-embedding (embedding similarity)
- Tier 2: Text-based scoring of community summaries
- Load entities from top-N matching communities
- Filter entities by query terms
Local search:
- Look up entity's community from cache
- Fallback to storage query if cache miss
- Fallback to semantic search if no community
- Load and filter community members
Thread Safety ¶
The Component is safe for concurrent use. Query handlers process requests concurrently via NATS subscription workers.
Metrics ¶
The package exports Prometheus metrics:
- graph_query_duration_seconds: Query latency histogram
- graph_query_cache_hits_total: Community cache hits
- graph_query_cache_misses_total: Community cache misses
- graph_query_storage_hits_total: Storage fallback hits
- graph_query_storage_misses_total: Storage fallback misses
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: EntityState, Graphable interface
- github.com/c360studio/semstreams/graph/clustering: Community detection
- github.com/c360studio/semstreams/graph/embedding: Semantic search
- github.com/c360studio/semstreams/pkg/resource: Resource availability watching
Package graphquery entity ID resolution ¶
Package graphquery GraphRAG search handlers ¶
Package graphquery provides Prometheus metrics for graph-query component.
Package graphquery PathRAG algorithm implementation ¶
Package graphquery query handlers
Index ¶
- Constants
- func CreateGraphQuery(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type CommunityCache
- func (c *CommunityCache) GetAllCommunities() []*clustering.Community
- func (c *CommunityCache) GetCommunitiesByLevel(level int) []*clustering.Community
- func (c *CommunityCache) GetCommunity(id string) *clustering.Community
- func (c *CommunityCache) GetEntityCommunity(entityID string, level int) *clustering.Community
- func (c *CommunityCache) IsReady() bool
- func (c *CommunityCache) Stats() CommunityStats
- func (c *CommunityCache) Stop()
- func (c *CommunityCache) WatchAndSync(ctx context.Context, bucket jetstream.KeyValue) error
- type CommunityStats
- type CommunitySummary
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type GlobalSearchRequest
- type GlobalSearchResponse
- type HierarchyChild
- type LocalSearchRequest
- type LocalSearchResponse
- type PathEntity
- type PathRAGResult
- type PathSearchRequest
- type PathSearchResponse
- type PathSearcher
- type PathStep
- type Relationship
- type RelationshipEntry
- type SemanticHit
- type Source
- type StaticRouter
Constants ¶
const ( // DefaultMaxCommunities is the default number of communities to search in GlobalSearch DefaultMaxCommunities = 5 // MaxTotalEntitiesInSearch limits the total number of entities that can be loaded // across all communities in GlobalSearch to prevent unbounded memory usage MaxTotalEntitiesInSearch = 10000 // ScoreWeightSummary is the weight for summary text matches in community scoring ScoreWeightSummary = 2.0 // ScoreWeightKeyword is the weight for keyword matches in community scoring ScoreWeightKeyword = 1.5 )
GraphRAG search constants
const ( DirectionOutgoing = "outgoing" // Follow edges from entity to targets (default) DirectionIncoming = "incoming" // Follow edges from sources to entity DirectionBoth = "both" // Follow edges in both directions )
Direction constants for path traversal
Variables ¶
This section is empty.
Functions ¶
func CreateGraphQuery ¶
func CreateGraphQuery(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateGraphQuery creates a new graph query coordinator component
Types ¶
type CommunityCache ¶
type CommunityCache struct {
// contains filtered or unexported fields
}
CommunityCache maintains an in-memory cache of communities from COMMUNITY_INDEX KV. It watches the KV bucket for changes and updates the cache in real-time. This is a consumer-owned cache - graph-query owns and manages its own view of community data.
func NewCommunityCache ¶
func NewCommunityCache(logger *slog.Logger) *CommunityCache
NewCommunityCache creates a new community cache.
func (*CommunityCache) GetAllCommunities ¶
func (c *CommunityCache) GetAllCommunities() []*clustering.Community
GetAllCommunities retrieves all communities regardless of level. Returns empty slice if no communities exist.
func (*CommunityCache) GetCommunitiesByLevel ¶
func (c *CommunityCache) GetCommunitiesByLevel(level int) []*clustering.Community
GetCommunitiesByLevel retrieves all communities at a specific level. Returns empty slice if no communities exist at that level.
func (*CommunityCache) GetCommunity ¶
func (c *CommunityCache) GetCommunity(id string) *clustering.Community
GetCommunity retrieves a community by ID. Returns nil if not found.
func (*CommunityCache) GetEntityCommunity ¶
func (c *CommunityCache) GetEntityCommunity(entityID string, level int) *clustering.Community
GetEntityCommunity retrieves the community containing an entity at a specific level. Returns nil if the entity is not in any community at that level.
func (*CommunityCache) IsReady ¶
func (c *CommunityCache) IsReady() bool
IsReady returns true if the initial sync from KV is complete.
func (*CommunityCache) Stats ¶
func (c *CommunityCache) Stats() CommunityStats
Stats returns cache statistics.
func (*CommunityCache) WatchAndSync ¶
WatchAndSync starts watching the COMMUNITY_INDEX KV bucket and syncs changes to the cache. This method blocks until the context is cancelled.
type CommunityStats ¶
type CommunityStats struct {
TotalCommunities int `json:"total_communities"`
TotalEntities int `json:"total_entities"`
ByLevel map[int]int `json:"by_level"`
Ready bool `json:"ready"`
}
CommunityStats provides cache statistics.
type CommunitySummary ¶
type CommunitySummary struct {
CommunityID string `json:"community_id"`
Summary string `json:"summary"`
Keywords []string `json:"keywords"`
Level int `json:"level"`
Relevance float64 `json:"relevance"`
}
CommunitySummary represents a community's summary used in global search
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the graph query coordinator
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the JSON schema for the component's configuration
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns the component's data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the component's health status
func (*Component) Initialize ¶
Initialize initializes the component
func (*Component) InputPorts ¶
InputPorts returns the component's input ports
func (*Component) OutputPorts ¶
OutputPorts returns the component's output ports (none for query coordinator)
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration,category:basic"`
QueryTimeout time.Duration `json:"query_timeout,omitempty" schema:"type:duration,description:Timeout for query operations,default:5s,category:basic"`
MaxDepth int `` /* 136-byte string literal not displayed */
StartupAttempts int `` /* 141-byte string literal not displayed */
StartupInterval time.Duration `` /* 134-byte string literal not displayed */
RecheckInterval time.Duration `` /* 137-byte string literal not displayed */
}
Config defines the configuration for the graph-query coordinator component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default configuration for the graph-query coordinator
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults applies default values to the configuration
func (Config) Schema ¶
func (c Config) Schema() component.ConfigSchema
Schema returns the configuration schema for the component
type GlobalSearchRequest ¶
type GlobalSearchRequest struct {
Query string `json:"query"`
Level int `json:"level"`
MaxCommunities int `json:"max_communities"`
IncludeSummaries *bool `json:"include_summaries,omitempty"` // Include community summaries (default: true)
IncludeRelationships bool `json:"include_relationships,omitempty"` // Include relationships between entities (default: false)
IncludeSources bool `json:"include_sources,omitempty"` // Include source attribution (default: false)
}
GlobalSearchRequest is the request format for global search
type GlobalSearchResponse ¶
type GlobalSearchResponse struct {
Entities []*gtypes.EntityState `json:"entities"`
CommunitySummaries []CommunitySummary `json:"community_summaries,omitempty"`
Relationships []Relationship `json:"relationships,omitempty"`
Sources []Source `json:"sources,omitempty"`
Count int `json:"count"`
DurationMs int64 `json:"duration_ms"`
Answer string `json:"answer,omitempty"`
AnswerModel string `json:"answer_model,omitempty"`
}
GlobalSearchResponse is the response format for global search
type HierarchyChild ¶
type HierarchyChild struct {
Prefix string `json:"prefix"`
Name string `json:"name"`
Count int `json:"count"`
}
HierarchyChild represents a child node in the hierarchy
type LocalSearchRequest ¶
type LocalSearchRequest struct {
EntityID string `json:"entity_id"`
Query string `json:"query"`
Level int `json:"level"`
}
LocalSearchRequest is the request format for local search
type LocalSearchResponse ¶
type LocalSearchResponse struct {
Entities []*gtypes.EntityState `json:"entities"`
CommunityID string `json:"communityId"`
Count int `json:"count"`
DurationMs int64 `json:"durationMs"`
}
LocalSearchResponse is the response format for local search
type PathEntity ¶
type PathEntity struct {
ID string `json:"id"`
Type string `json:"type"`
Score float64 `json:"score"`
}
PathEntity represents a discovered entity with relevance score
type PathRAGResult ¶
type PathRAGResult struct {
Entities []*gtypes.EntityState
Truncated bool
}
PathRAGResult wraps the result of a PathRAG query for use in handleGlobalSearch. It contains EntityState objects rather than PathEntity for consistency with GlobalSearchResponse.
type PathSearchRequest ¶
type PathSearchRequest struct {
StartEntity string `json:"start_entity"`
MaxDepth int `json:"max_depth"`
MaxNodes int `json:"max_nodes"`
IncludeSiblings bool `json:"include_siblings"`
Direction string `json:"direction,omitempty"` // "outgoing" (default), "incoming", "both"
Predicates []string `json:"predicates,omitempty"` // Filter to specific predicates (empty = all)
Timeout string `json:"timeout,omitempty"` // Request timeout e.g. "5s" (0 = default)
MaxPaths int `json:"max_paths,omitempty"` // Limit number of paths returned (0 = unlimited)
}
PathSearchRequest defines the request schema for path search queries
type PathSearchResponse ¶
type PathSearchResponse struct {
Entities []PathEntity `json:"entities"`
Paths [][]PathStep `json:"paths"` // Each path is a sequence of steps from start to entity
Truncated bool `json:"truncated"`
}
PathSearchResponse defines the response for path search
type PathSearcher ¶
type PathSearcher struct {
// contains filtered or unexported fields
}
PathSearcher executes PathRAG traversal with proper path tracking
func NewPathSearcher ¶
func NewPathSearcher(nats natsRequester, timeout time.Duration, maxDepth int, logger *slog.Logger) *PathSearcher
NewPathSearcher creates a new PathSearcher instance
func (*PathSearcher) Search ¶
func (p *PathSearcher) Search(ctx context.Context, req PathSearchRequest) (*PathSearchResponse, error)
Search performs BFS traversal with path tracking
type PathStep ¶
type PathStep struct {
From string `json:"from"`
Predicate string `json:"predicate"`
To string `json:"to"`
}
PathStep represents a single edge traversal
type Relationship ¶
type Relationship struct {
FromEntityID string `json:"from_entity_id"`
ToEntityID string `json:"to_entity_id"`
Predicate string `json:"predicate"`
}
Relationship represents a relationship between two entities in search results
type RelationshipEntry ¶
type RelationshipEntry struct {
ToEntityID string `json:"to_entity_id"`
Predicate string `json:"predicate"`
}
RelationshipEntry represents an outgoing relationship from graph-index
type SemanticHit ¶
SemanticHit represents a search result with semantic similarity score
type Source ¶
type Source struct {
EntityID string `json:"entity_id"`
CommunityID string `json:"community_id,omitempty"`
Relevance float64 `json:"relevance"`
}
Source represents source attribution for search results
type StaticRouter ¶
type StaticRouter struct {
// contains filtered or unexported fields
}
StaticRouter routes queries to known graph component subjects. Routing is based on query type string, not runtime discovery.
func NewStaticRouter ¶
func NewStaticRouter(logger *slog.Logger) *StaticRouter
NewStaticRouter creates a router with static routes for all known query types.
func (*StaticRouter) Route ¶
func (r *StaticRouter) Route(queryType string) string
Route returns the NATS subject for a given query type. Returns empty string if the query type is unknown or receiver is nil.