Documentation
¶
Overview ¶
Package clustering provides community detection algorithms and graph clustering for discovering structural patterns in the knowledge graph.
Overview ¶
The clustering package implements the Label Propagation Algorithm (LPA) for detecting communities of related entities. It supports hierarchical clustering with multiple granularity levels, enabling both fine-grained local communities and coarse-grained global clusters.
Detected communities are enriched with statistical summaries (TF-IDF keyword extraction) immediately, with optional LLM enhancement performed asynchronously. PageRank is used to identify representative entities within each community.
Architecture ¶
Graph Provider
↓
┌──────────────────────────────────────────────────────────────┐
│ LPA Detector │
├──────────────────────────────────────────────────────────────┤
│ Level 0 (fine) → Level 1 (mid) → Level 2 (coarse) │
└──────────────────────────────────────────────────────────────┘
↓
┌────────────────────┐ ┌────────────────────────────────────┐
│ Progressive │───→│ COMMUNITY_INDEX KV │
│ Summarizer │ │ - {level}.{community_id} │
│ (statistical) │ │ - entity.{level}.{entity_id} │
└────────────────────┘ └────────────────────────────────────┘
↓
┌───────────────────────┐
│ Enhancement Worker │
│ (async LLM via KV │
│ watcher) │
└───────────────────────┘
Usage ¶
Configure and run community detection:
// Create storage backed by NATS KV
storage := clustering.NewNATSCommunityStorage(communityBucket)
// Create LPA detector with progressive summarization
detector := clustering.NewLPADetector(graphProvider, storage).
WithLevels(3).
WithMaxIterations(100).
WithProgressiveSummarization(
clustering.NewProgressiveSummarizer(),
entityProvider,
)
// Run detection (typically after graph updates)
communities, err := detector.DetectCommunities(ctx)
// communities[0] = fine-grained, communities[1] = mid, communities[2] = coarse
Query communities and entities:
// Get community for an entity at specific level community, err := detector.GetEntityCommunity(ctx, entityID, 0) // Get all communities at a level allLevel0, err := detector.GetCommunitiesByLevel(ctx, 0) // Infer relationships from community co-membership triples, err := detector.InferRelationshipsFromCommunities(ctx, 0, clustering.DefaultInferenceConfig())
Label Propagation Algorithm ¶
LPA iteratively assigns entities to communities based on neighbor voting:
- Each entity starts with its own unique label
- Entities adopt the most frequent label among their neighbors (weighted by edge weight)
- Process continues until convergence or max iterations reached
- Shuffled processing order reduces oscillation
Hierarchical levels are computed by treating communities from level N as super-nodes for level N+1 detection.
Community Summarization ¶
Three summarization strategies are available:
StatisticalSummarizer:
- TF-IDF-like keyword extraction from entity types and properties
- PageRank-based representative entity selection
- Template-based summary generation
- Always available, no external dependencies
LLMSummarizer:
- OpenAI-compatible LLM for natural language summaries
- Works with shimmy, OpenAI, Ollama, vLLM, etc.
- Falls back to statistical on service unavailability
ProgressiveSummarizer:
- Statistical summary immediately available
- LLM enhancement performed asynchronously
- Best for user-facing applications needing fast initial response
Enhancement Worker ¶
The EnhancementWorker watches COMMUNITY_INDEX KV for communities with status="statistical" and asynchronously generates LLM summaries:
worker, err := clustering.NewEnhancementWorker(&clustering.EnhancementWorkerConfig{
LLMSummarizer: llmSummarizer,
Storage: storage,
Provider: graphProvider,
Querier: queryManager,
CommunityBucket: communityBucket,
})
worker.Start(ctx)
defer worker.Stop()
PageRank ¶
PageRank identifies influential entities within communities:
config := clustering.DefaultPageRankConfig() config.TopN = 10 result, err := clustering.ComputePageRankForCommunity(ctx, provider, memberIDs, config) // result.Ranked = top 10 entities by PageRank score // result.Scores = map of entity ID to normalized score
Default configuration:
- Iterations: 20
- DampingFactor: 0.85
- Tolerance: 1e-6 (convergence threshold)
Configuration ¶
LPA detector configuration:
MaxIterations: 100 # Maximum iterations (limit: 10000) Levels: 3 # Hierarchical levels (limit: 10)
LLM summary transfer between detection runs:
SummaryTransferThreshold: 0.8 # Jaccard overlap for preserving LLM summaries
Inference configuration for relationship generation:
MinCommunitySize: 2 # Minimum size for inference MaxInferredPerCommunity: 50 # Limit to prevent O(n²) explosion
PageRank configuration:
Iterations: 20 # Max iterations DampingFactor: 0.85 # Random walk continuation probability Tolerance: 1e-6 # Convergence threshold TopN: 0 # Return all (or limit to top N)
Storage ¶
Communities are stored in the COMMUNITY_INDEX KV bucket:
{level}.{community_id} → Community JSON
entity.{level}.{entity_id} → Community ID (for entity lookup)
Storage can optionally create member_of triples:
config := clustering.CommunityStorageConfig{
CreateTriples: true,
TriplePredicate: "graph.community.member_of",
}
storage := clustering.NewNATSCommunityStorageWithConfig(kv, config)
Thread Safety ¶
LPADetector, NATSCommunityStorage, and EnhancementWorker are safe for concurrent use. The enhancement worker supports pause/resume for coordinated graph updates:
worker.Pause() // Stop processing new communities // ... perform graph updates ... worker.Resume() // Continue processing
Metrics ¶
The clustering package exports Prometheus metrics under the semstreams_clustering namespace:
- communities_detected_total: Communities detected by level
- detection_duration_seconds: Detection run duration
- enhancement_latency_seconds: LLM enhancement duration
- enhancement_queue_depth: Pending enhancements
- enhancement_success_total: Successful LLM enhancements
- enhancement_failed_total: Failed LLM enhancements
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: Core graph types and Provider interface
- github.com/c360studio/semstreams/graph/llm: LLM integration for summarization
- github.com/c360studio/semstreams/graph/inference: Anomaly detection triggered by clustering
Package clustering provides graph clustering algorithms and community detection.
Package clustering provides community detection algorithms and graph providers.
Package clustering provides community detection algorithms and graph providers.
Index ¶
- Constants
- func ComputeRepresentativeEntities(ctx context.Context, provider Provider, communityMembers []string, topN int) ([]string, map[string]float64, error)
- type Community
- type CommunityDetector
- type CommunityStorage
- type CommunityStorageConfig
- type CommunitySummarizer
- type Direction
- type EnhancementMetrics
- func (m *EnhancementMetrics) DecQueueDepth()
- func (m *EnhancementMetrics) IncQueueDepth()
- func (m *EnhancementMetrics) RecordEnhancementFailed(latencySeconds float64)
- func (m *EnhancementMetrics) RecordEnhancementStart()
- func (m *EnhancementMetrics) RecordEnhancementSuccess(latencySeconds float64)
- func (m *EnhancementMetrics) SetQueueDepth(depth int)
- type EnhancementWorker
- type EnhancementWorkerConfig
- type EntityIDProvider
- func (p *EntityIDProvider) ClearCache()
- func (p *EntityIDProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)
- func (p *EntityIDProvider) GetCacheStats() (prefixes int, entities int)
- func (p *EntityIDProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)
- func (p *EntityIDProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
- func (p *EntityIDProvider) GetSiblingEdgeMetrics() (successes, errors int64)
- type EntityIDProviderConfig
- type EntityProvider
- type EntityQuerier
- type InferenceConfig
- type InferredTriple
- type LLMSummarizer
- type LLMSummarizerConfig
- type LLMSummarizerOption
- type LPADetector
- func (d *LPADetector) DetectCommunities(ctx context.Context) (map[int][]*Community, error)
- func (d *LPADetector) GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
- func (d *LPADetector) GetCommunity(ctx context.Context, id string) (*Community, error)
- func (d *LPADetector) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
- func (d *LPADetector) InferRelationshipsFromCommunities(ctx context.Context, level int, config InferenceConfig) ([]InferredTriple, error)
- func (d *LPADetector) SetEntityProvider(provider EntityProvider)
- func (d *LPADetector) UpdateCommunities(ctx context.Context, _ []string) error
- func (d *LPADetector) WithLevels(levels int) *LPADetector
- func (d *LPADetector) WithLogger(logger *slog.Logger) *LPADetector
- func (d *LPADetector) WithMaxIterations(max int) *LPADetector
- func (d *LPADetector) WithProgressiveSummarization(summarizer CommunitySummarizer, entityProvider EntityProvider) *LPADetector
- func (d *LPADetector) WithSummarizer(summarizer CommunitySummarizer) *LPADetector
- type NATSCommunityStorage
- func (s *NATSCommunityStorage) Clear(ctx context.Context) error
- func (s *NATSCommunityStorage) DeleteCommunity(ctx context.Context, id string) error
- func (s *NATSCommunityStorage) GetAllCommunities(ctx context.Context) ([]*Community, error)
- func (s *NATSCommunityStorage) GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
- func (s *NATSCommunityStorage) GetCommunity(ctx context.Context, id string) (*Community, error)
- func (s *NATSCommunityStorage) GetCreatedTriples() []message.Triple
- func (s *NATSCommunityStorage) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
- func (s *NATSCommunityStorage) SaveCommunity(ctx context.Context, community *Community) error
- type PageRankConfig
- type PageRankResult
- type PredicateProvider
- type ProgressiveSummarizer
- type Provider
- type QueryManagerProvider
- func (p *QueryManagerProvider) GetAllEntityIDs(_ context.Context) ([]string, error)
- func (p *QueryManagerProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)
- func (p *QueryManagerProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
- type Relationship
- type RelationshipQuerier
- type SemanticProvider
- func (p *SemanticProvider) ClearCache()
- func (p *SemanticProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)
- func (p *SemanticProvider) GetCacheStats() (entities int, edges int)
- func (p *SemanticProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)
- func (p *SemanticProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
- type SemanticProviderConfig
- type SimilarityFinder
- type StatisticalSummarizer
Constants ¶
const ( // DefaultMaxIterations is the default maximum iteration count DefaultMaxIterations = 100 // MaxIterationsLimit is the maximum allowed iteration count MaxIterationsLimit = 10000 // DefaultLevels is the default number of hierarchical levels DefaultLevels = 3 // MaxLevelsLimit is the maximum allowed hierarchical levels MaxLevelsLimit = 10 // SummaryTransferThreshold is the minimum Jaccard overlap for transferring LLM summaries // between archived and newly detected communities SummaryTransferThreshold = 0.8 )
const ( // CommunityBucket is the NATS KV bucket for storing communities CommunityBucket = "COMMUNITY_INDEX" // MaxCommunityLevels is the maximum hierarchy depth for scanning MaxCommunityLevels = 10 )
Variables ¶
This section is empty.
Functions ¶
func ComputeRepresentativeEntities ¶
func ComputeRepresentativeEntities(ctx context.Context, provider Provider, communityMembers []string, topN int) ([]string, map[string]float64, error)
ComputeRepresentativeEntities computes representative entities for a community using PageRank Returns the top N entities by PageRank score
Types ¶
type Community ¶
type Community struct {
// ID is the unique identifier for this community
ID string `json:"id"`
// Level indicates the hierarchy level (0=bottom, 1=mid, 2=top)
Level int `json:"level"`
// Members contains the entity IDs belonging to this community
Members []string `json:"members"`
// ParentID references the parent community at the next level up (nil for top level)
ParentID *string `json:"parent_id,omitempty"`
// StatisticalSummary is the fast statistical baseline summary (always present)
// Generated using TF-IDF keyword extraction and template-based summarization
StatisticalSummary string `json:"statistical_summary,omitempty"`
// LLMSummary is the enhanced LLM-generated summary (populated asynchronously)
// Empty until LLM enhancement completes successfully
LLMSummary string `json:"llm_summary,omitempty"`
// Keywords are extracted key terms representing this community's themes
// e.g., ["autonomous", "navigation", "sensor-fusion"]
Keywords []string `json:"keywords,omitempty"`
// RepEntities contains IDs of representative entities within this community
// These entities best exemplify the community's characteristics
RepEntities []string `json:"rep_entities,omitempty"`
// SummaryStatus tracks the summarization state
// Values: "statistical" (initial), "llm-enhanced" (enhanced), "llm-failed" (enhancement failed)
SummaryStatus string `json:"summary_status,omitempty"`
// Metadata stores additional community properties
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Community represents a detected community/cluster in the graph
type CommunityDetector ¶
type CommunityDetector interface {
// DetectCommunities runs community detection on the entire graph
// Returns communities organized by hierarchical level
DetectCommunities(ctx context.Context) (map[int][]*Community, error)
// UpdateCommunities incrementally updates communities based on recent graph changes
// entityIDs are entities that have been added/modified since last detection
UpdateCommunities(ctx context.Context, entityIDs []string) error
// GetCommunity retrieves a specific community by ID
GetCommunity(ctx context.Context, id string) (*Community, error)
// GetEntityCommunity returns the community containing the given entity
// level specifies which hierarchical level to query (0=bottom, 1=mid, 2=top)
GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
// GetCommunitiesByLevel returns all communities at a specific hierarchical level
GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
// InferRelationshipsFromCommunities generates inferred triples from community co-membership.
// For each community with >= minCommunitySize members, creates bidirectional
// "inferred.clustered_with" triples between members.
InferRelationshipsFromCommunities(ctx context.Context, level int, config InferenceConfig) ([]InferredTriple, error)
}
CommunityDetector performs community detection on a graph
type CommunityStorage ¶
type CommunityStorage interface {
// SaveCommunity persists a community
SaveCommunity(ctx context.Context, community *Community) error
// GetCommunity retrieves a community by ID
GetCommunity(ctx context.Context, id string) (*Community, error)
// GetCommunitiesByLevel retrieves all communities at a level
GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
// GetEntityCommunity retrieves the community for an entity at a level
GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
// DeleteCommunity removes a community
DeleteCommunity(ctx context.Context, id string) error
// Clear removes all communities (useful for full recomputation)
Clear(ctx context.Context) error
// GetAllCommunities returns all communities across all levels
// Used for archiving enhanced communities before Clear()
GetAllCommunities(ctx context.Context) ([]*Community, error)
}
CommunityStorage abstracts persistence layer for communities
type CommunityStorageConfig ¶
type CommunityStorageConfig struct {
// CreateTriples enables creation of member_of triples during SaveCommunity
CreateTriples bool
// TriplePredicate specifies the predicate to use for community membership triples
// Default: "graph.community.member_of"
TriplePredicate string
}
CommunityStorageConfig configures community storage behavior
type CommunitySummarizer ¶
type CommunitySummarizer interface {
// SummarizeCommunity generates a summary for a community
// Returns updated Community with Summary, Keywords, RepEntities, and Summarizer fields populated
SummarizeCommunity(ctx context.Context, community *Community, entities []*gtypes.EntityState) (*Community, error)
}
CommunitySummarizer generates summaries for communities
type Direction ¶
type Direction string
Direction represents the direction of relationship traversal. Local copy to avoid import cycle with querymanager.
type EnhancementMetrics ¶
type EnhancementMetrics struct {
// contains filtered or unexported fields
}
EnhancementMetrics provides Prometheus metrics for LLM community enhancement
func NewEnhancementMetrics ¶
func NewEnhancementMetrics(component string, registry *metric.MetricsRegistry) *EnhancementMetrics
NewEnhancementMetrics creates a new EnhancementMetrics instance using MetricsRegistry
func (*EnhancementMetrics) DecQueueDepth ¶
func (m *EnhancementMetrics) DecQueueDepth()
DecQueueDepth decrements the queue depth by 1
func (*EnhancementMetrics) IncQueueDepth ¶
func (m *EnhancementMetrics) IncQueueDepth()
IncQueueDepth increments the queue depth by 1
func (*EnhancementMetrics) RecordEnhancementFailed ¶
func (m *EnhancementMetrics) RecordEnhancementFailed(latencySeconds float64)
RecordEnhancementFailed records a failed enhancement with latency
func (*EnhancementMetrics) RecordEnhancementStart ¶
func (m *EnhancementMetrics) RecordEnhancementStart()
RecordEnhancementStart records the start of an enhancement attempt
func (*EnhancementMetrics) RecordEnhancementSuccess ¶
func (m *EnhancementMetrics) RecordEnhancementSuccess(latencySeconds float64)
RecordEnhancementSuccess records a successful enhancement with latency
func (*EnhancementMetrics) SetQueueDepth ¶
func (m *EnhancementMetrics) SetQueueDepth(depth int)
SetQueueDepth sets the current queue depth
type EnhancementWorker ¶
type EnhancementWorker struct {
// contains filtered or unexported fields
}
EnhancementWorker handles asynchronous LLM enhancement of community summaries via KV watch
func NewEnhancementWorker ¶
func NewEnhancementWorker(config *EnhancementWorkerConfig) (*EnhancementWorker, error)
NewEnhancementWorker creates a new enhancement worker
func (*EnhancementWorker) IsPaused ¶
func (w *EnhancementWorker) IsPaused() bool
IsPaused returns whether the worker is currently paused.
func (*EnhancementWorker) Pause ¶
func (w *EnhancementWorker) Pause()
Pause stops processing new communities while allowing in-flight work to complete. Safe to call multiple times. Returns immediately after signaling workers to pause.
func (*EnhancementWorker) Resume ¶
func (w *EnhancementWorker) Resume()
Resume allows processing to continue after a Pause. Safe to call multiple times.
func (*EnhancementWorker) Start ¶
func (w *EnhancementWorker) Start(ctx context.Context) error
Start begins watching for communities needing LLM enhancement
func (*EnhancementWorker) Stop ¶
func (w *EnhancementWorker) Stop() error
Stop gracefully stops the enhancement worker
func (*EnhancementWorker) WithWorkers ¶
func (w *EnhancementWorker) WithWorkers(n int) *EnhancementWorker
WithWorkers sets the number of concurrent workers. Must be called before Start(). Has no effect if worker is already started.
type EnhancementWorkerConfig ¶
type EnhancementWorkerConfig struct {
LLMSummarizer *LLMSummarizer
Storage CommunityStorage
Provider Provider
Querier EntityQuerier
CommunityBucket jetstream.KeyValue
Logger *slog.Logger
Registry *metric.MetricsRegistry // Optional: for LLM enhancement metrics
}
EnhancementWorkerConfig holds configuration for the enhancement worker
type EntityIDProvider ¶
type EntityIDProvider struct {
// contains filtered or unexported fields
}
EntityIDProvider wraps a base Provider and adds virtual edges based on EntityID hierarchy. This enables LPA clustering to find communities using the 6-part EntityID structure even when explicit relationship triples don't exist.
EntityID format: org.platform.domain.system.type.instance Entities with the same 5-part TypePrefix (org.platform.domain.system.type) are considered siblings.
Virtual edges are computed on-demand and cached for performance. These edges are NOT persisted - they're ephemeral hints for the clustering algorithm.
Explicit edges (from base provider) always take precedence over virtual edges.
func NewEntityIDProvider ¶
func NewEntityIDProvider( base Provider, config EntityIDProviderConfig, logger *slog.Logger, ) *EntityIDProvider
NewEntityIDProvider creates a Provider that augments explicit edges with virtual edges based on EntityID hierarchy (6-part dotted format).
Parameters:
- base: The underlying Provider for explicit edges (also used to list all entities)
- config: Configuration for edge weights and limits
- logger: Optional logger for observability (can be nil)
func (*EntityIDProvider) ClearCache ¶
func (p *EntityIDProvider) ClearCache()
ClearCache clears the type prefix cache and propagates to wrapped providers. Call this when entities are added/removed.
func (*EntityIDProvider) GetAllEntityIDs ¶
func (p *EntityIDProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)
GetAllEntityIDs delegates to the base provider
func (*EntityIDProvider) GetCacheStats ¶
func (p *EntityIDProvider) GetCacheStats() (prefixes int, entities int)
GetCacheStats returns statistics about the type prefix cache for monitoring.
func (*EntityIDProvider) GetEdgeWeight ¶
GetEdgeWeight returns the weight of an edge between two entities.
For explicit edges: delegates to base provider For sibling edges: returns configured sibling weight (default 0.7)
Explicit edges always take precedence - if base returns weight > 0, that's used directly. Sibling edge weight is only used when no explicit edge exists.
func (*EntityIDProvider) GetNeighbors ¶
func (p *EntityIDProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
GetNeighbors returns both explicit neighbors and sibling neighbors from EntityID hierarchy. Sibling neighbors are entities that share the same 5-part type prefix.
Direction parameter is respected for explicit edges but ignored for sibling edges (sibling relationships are symmetric).
func (*EntityIDProvider) GetSiblingEdgeMetrics ¶
func (p *EntityIDProvider) GetSiblingEdgeMetrics() (successes, errors int64)
GetSiblingEdgeMetrics returns metrics for sibling edge operations.
type EntityIDProviderConfig ¶
type EntityIDProviderConfig struct {
// SiblingWeight is the edge weight for sibling relationships.
// Higher values = stronger connection influence in LPA.
// Recommended: 0.7 (lower than explicit edges at 1.0)
SiblingWeight float64
// MaxSiblings limits sibling neighbors per entity to control
// computation cost during LPA iterations.
// Recommended: 10
MaxSiblings int
// IncludeSiblings enables sibling edge discovery.
// Set to false to disable EntityID-based edges entirely.
IncludeSiblings bool
}
EntityIDProviderConfig holds configuration for EntityIDProvider
func DefaultEntityIDProviderConfig ¶
func DefaultEntityIDProviderConfig() EntityIDProviderConfig
DefaultEntityIDProviderConfig returns sensible defaults for clustering
type EntityProvider ¶
type EntityProvider interface {
GetEntities(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
}
EntityProvider interface for fetching full entity states for summarization
type EntityQuerier ¶
type EntityQuerier interface {
GetEntities(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
}
EntityQuerier provides minimal interface for querying entities. This interface exists to avoid import cycle with querymanager package.
type InferenceConfig ¶
type InferenceConfig struct {
// MinCommunitySize is the minimum community size for generating inferences
// Singleton communities (size=1) never produce inferences
MinCommunitySize int
// MaxInferredPerCommunity limits inferred relationships per community
// Prevents O(n²) explosion in large communities
MaxInferredPerCommunity int
}
InferenceConfig holds configuration for relationship inference
func DefaultInferenceConfig ¶
func DefaultInferenceConfig() InferenceConfig
DefaultInferenceConfig returns sensible defaults for relationship inference
type InferredTriple ¶
type InferredTriple struct {
Subject string
Predicate string
Object string
Source string
Confidence float64
Timestamp time.Time
CommunityID string // Community that produced this inference
Level int // Hierarchical level
}
InferredTriple represents a relationship inferred from community detection. This is a lightweight struct for returning inference results. The caller converts these to message.Triple for persistence.
type LLMSummarizer ¶
type LLMSummarizer struct {
// Client is the LLM client for making chat completion requests.
Client llm.Client
// FallbackSummarizer is used if LLM service is unavailable.
FallbackSummarizer *StatisticalSummarizer
// MaxTokens limits the response length (default: 150).
MaxTokens int
// ContentFetcher optionally fetches entity content (title, abstract) for richer prompts.
// If nil, prompts use only entity IDs and triple-derived keywords.
ContentFetcher llm.ContentFetcher
}
LLMSummarizer implements CommunitySummarizer using an OpenAI-compatible LLM API. This summarizer calls an external LLM service for higher quality natural language summaries.
It works with any OpenAI-compatible backend:
- shimmy (recommended for local inference)
- OpenAI cloud
- Ollama, vLLM, etc.
func NewLLMSummarizer ¶
func NewLLMSummarizer(cfg LLMSummarizerConfig, opts ...LLMSummarizerOption) (*LLMSummarizer, error)
NewLLMSummarizer creates an LLM-based summarizer with the given configuration. Optional functional options can be provided to configure additional features.
func (*LLMSummarizer) SummarizeCommunity ¶
func (s *LLMSummarizer) SummarizeCommunity( ctx context.Context, community *Community, entities []*gtypes.EntityState, ) (*Community, error)
SummarizeCommunity generates an LLM-based summary of the community. Implements CommunitySummarizer interface with 3-param signature. Content fetching happens internally using the optional ContentFetcher.
type LLMSummarizerConfig ¶
type LLMSummarizerConfig struct {
// Client is the LLM client (required).
Client llm.Client
// MaxTokens limits the response length (default: 150).
MaxTokens int
}
LLMSummarizerConfig configures the LLM summarizer.
type LLMSummarizerOption ¶
type LLMSummarizerOption func(*LLMSummarizer) error
LLMSummarizerOption configures an LLMSummarizer using the functional options pattern. Options return errors for validation (following natsclient pattern).
func WithContentFetcher ¶
func WithContentFetcher(fetcher llm.ContentFetcher) LLMSummarizerOption
WithContentFetcher sets the ContentFetcher for enriching prompts with entity content. If not set, prompts use only entity IDs and triple-derived keywords.
type LPADetector ¶
type LPADetector struct {
// contains filtered or unexported fields
}
LPADetector implements community detection using Label Propagation Algorithm
func NewLPADetector ¶
func NewLPADetector(provider Provider, storage CommunityStorage) *LPADetector
NewLPADetector creates a new Label Propagation Algorithm detector
func (*LPADetector) DetectCommunities ¶
DetectCommunities runs full community detection across all hierarchical levels
func (*LPADetector) GetCommunitiesByLevel ¶
GetCommunitiesByLevel returns all communities at a level
func (*LPADetector) GetCommunity ¶
GetCommunity retrieves a community by ID
func (*LPADetector) GetEntityCommunity ¶
func (d *LPADetector) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
GetEntityCommunity returns the community for an entity at a specific level
func (*LPADetector) InferRelationshipsFromCommunities ¶
func (d *LPADetector) InferRelationshipsFromCommunities( ctx context.Context, level int, config InferenceConfig, ) ([]InferredTriple, error)
InferRelationshipsFromCommunities generates inferred triples from community co-membership. For each community with >= minCommunitySize members, this creates bidirectional "inferred.clustered_with" triples between members.
Parameters:
- level: Hierarchical level to process (0 = most granular)
- config: Inference configuration (min size, max pairs)
Returns triples suitable for persistence via graph.mutation.triple.add. The caller is responsible for persisting these triples.
Confidence scoring:
- Base confidence: 0.5 (inferred relationships)
- Adjusted by community tightness: +0.0 to +0.3 based on internal similarity
- Final range: 0.5-0.8 for inferred relationships
func (*LPADetector) SetEntityProvider ¶
func (d *LPADetector) SetEntityProvider(provider EntityProvider)
SetEntityProvider sets the entity provider for fetching entities during summarization. This method supports deferred initialization - call after the entity provider becomes available. Both summarizer and entityProvider must be set for summarization to occur.
func (*LPADetector) UpdateCommunities ¶
func (d *LPADetector) UpdateCommunities(ctx context.Context, _ []string) error
UpdateCommunities incrementally updates communities based on changed entities
func (*LPADetector) WithLevels ¶
func (d *LPADetector) WithLevels(levels int) *LPADetector
WithLevels sets the number of hierarchical levels with validation
func (*LPADetector) WithLogger ¶
func (d *LPADetector) WithLogger(logger *slog.Logger) *LPADetector
WithLogger sets the logger for the detector
func (*LPADetector) WithMaxIterations ¶
func (d *LPADetector) WithMaxIterations(max int) *LPADetector
WithMaxIterations sets the maximum iteration count with validation
func (*LPADetector) WithProgressiveSummarization ¶
func (d *LPADetector) WithProgressiveSummarization( summarizer CommunitySummarizer, entityProvider EntityProvider, ) *LPADetector
WithProgressiveSummarization enables progressive summarization with LLM enhancement summarizer: generates statistical summaries immediately entityProvider: fetches full entity states for summarization Note: EnhancementWorker watches COMMUNITY_INDEX KV for async LLM enhancement (no NATS events needed)
func (*LPADetector) WithSummarizer ¶
func (d *LPADetector) WithSummarizer(summarizer CommunitySummarizer) *LPADetector
WithSummarizer sets the summarizer without requiring an entity provider. Use SetEntityProvider() later to enable summarization once the provider is available. This supports deferred initialization patterns where the entity provider isn't available at detector creation time.
type NATSCommunityStorage ¶
type NATSCommunityStorage struct {
// contains filtered or unexported fields
}
NATSCommunityStorage implements CommunityStorage using NATS KV
func NewNATSCommunityStorage ¶
func NewNATSCommunityStorage(kv jetstream.KeyValue) *NATSCommunityStorage
NewNATSCommunityStorage creates a new NATS-backed community storage with default configuration (no triple creation)
func NewNATSCommunityStorageWithConfig ¶
func NewNATSCommunityStorageWithConfig(kv jetstream.KeyValue, config CommunityStorageConfig) *NATSCommunityStorage
NewNATSCommunityStorageWithConfig creates a new NATS-backed community storage with custom configuration for triple creation
func (*NATSCommunityStorage) Clear ¶
func (s *NATSCommunityStorage) Clear(ctx context.Context) error
Clear removes all communities and entity mappings. This is a best-effort operation - context cancellation during cleanup is ignored since partial cleanup is acceptable during shutdown.
func (*NATSCommunityStorage) DeleteCommunity ¶
func (s *NATSCommunityStorage) DeleteCommunity(ctx context.Context, id string) error
DeleteCommunity removes a community
func (*NATSCommunityStorage) GetAllCommunities ¶
func (s *NATSCommunityStorage) GetAllCommunities(ctx context.Context) ([]*Community, error)
GetAllCommunities returns all communities across all levels Used by the LPA detector to archive enhanced communities before Clear()
func (*NATSCommunityStorage) GetCommunitiesByLevel ¶
func (s *NATSCommunityStorage) GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
GetCommunitiesByLevel retrieves all communities at a level
func (*NATSCommunityStorage) GetCommunity ¶
GetCommunity retrieves a community by ID. Since community IDs no longer embed the level, this scans all levels to find the community.
func (*NATSCommunityStorage) GetCreatedTriples ¶
func (s *NATSCommunityStorage) GetCreatedTriples() []message.Triple
GetCreatedTriples returns all triples created during SaveCommunity operations This method is primarily for testing and verification purposes
func (*NATSCommunityStorage) GetEntityCommunity ¶
func (s *NATSCommunityStorage) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
GetEntityCommunity retrieves the community for an entity at a level
func (*NATSCommunityStorage) SaveCommunity ¶
func (s *NATSCommunityStorage) SaveCommunity(ctx context.Context, community *Community) error
SaveCommunity persists a community to NATS KV
type PageRankConfig ¶
type PageRankConfig struct {
// Iterations is the number of iterations to run (default: 20)
Iterations int
// DampingFactor is the probability of continuing the random walk (default: 0.85)
DampingFactor float64
// Tolerance is the convergence threshold (default: 1e-6)
Tolerance float64
// TopN is the number of top-ranked nodes to return (0 = all)
TopN int
}
PageRankConfig holds configuration for PageRank computation
func DefaultPageRankConfig ¶
func DefaultPageRankConfig() PageRankConfig
DefaultPageRankConfig returns the standard PageRank configuration
type PageRankResult ¶
type PageRankResult struct {
// Scores maps entity ID to PageRank score
Scores map[string]float64
// Ranked contains entity IDs sorted by PageRank score (descending)
Ranked []string
// Iterations is the actual number of iterations run
Iterations int
// Converged indicates whether the algorithm converged before max iterations
Converged bool
}
PageRankResult holds the results of PageRank computation
func ComputePageRank ¶
func ComputePageRank(ctx context.Context, provider Provider, config PageRankConfig) (*PageRankResult, error)
ComputePageRank computes PageRank scores for all nodes in the graph
func ComputePageRankForCommunity ¶
func ComputePageRankForCommunity(ctx context.Context, provider Provider, communityMembers []string, config PageRankConfig) (*PageRankResult, error)
ComputePageRankForCommunity computes PageRank for entities within a community This is more efficient than full graph PageRank for large graphs
type PredicateProvider ¶
type PredicateProvider struct {
// contains filtered or unexported fields
}
PredicateProvider implements Provider for entities matching a predicate This is more practical for real-world use: cluster entities of specific types
func NewPredicateProvider ¶
func NewPredicateProvider(qm RelationshipQuerier, predicate string) *PredicateProvider
NewPredicateProvider creates a Provider for entities matching a predicate It caches the valid entity set at construction time for performance
func (*PredicateProvider) GetAllEntityIDs ¶
func (p *PredicateProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)
GetAllEntityIDs returns all entities matching the predicate
func (*PredicateProvider) GetEdgeWeight ¶
func (p *PredicateProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)
GetEdgeWeight returns the weight of an edge (unweighted: 1.0 or 0.0)
func (*PredicateProvider) GetNeighbors ¶
func (p *PredicateProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
GetNeighbors returns entity IDs connected to the given entity
type ProgressiveSummarizer ¶
type ProgressiveSummarizer struct {
// contains filtered or unexported fields
}
ProgressiveSummarizer provides progressive enhancement - statistical summary immediately, LLM enhancement asynchronously via events
func NewProgressiveSummarizer ¶
func NewProgressiveSummarizer() *ProgressiveSummarizer
NewProgressiveSummarizer creates a progressive summarizer with default settings
func (*ProgressiveSummarizer) SummarizeCommunity ¶
func (s *ProgressiveSummarizer) SummarizeCommunity( ctx context.Context, community *Community, entities []*gtypes.EntityState, ) (*Community, error)
SummarizeCommunity generates an immediate statistical summary Caller is responsible for saving and publishing community.detected event for async LLM enhancement
type Provider ¶
Provider is an alias to the shared interface in graph package. Abstracts the graph data source for community detection.
type QueryManagerProvider ¶
type QueryManagerProvider struct {
// contains filtered or unexported fields
}
QueryManagerProvider implements Provider using QueryManager
func NewQueryManagerProvider ¶
func NewQueryManagerProvider(qm RelationshipQuerier) *QueryManagerProvider
NewQueryManagerProvider creates a Provider backed by QueryManager
func (*QueryManagerProvider) GetAllEntityIDs ¶
func (p *QueryManagerProvider) GetAllEntityIDs(_ context.Context) ([]string, error)
GetAllEntityIDs returns all entity IDs in the graph
func (*QueryManagerProvider) GetEdgeWeight ¶
func (p *QueryManagerProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)
GetEdgeWeight returns the weight of an edge between two entities
func (*QueryManagerProvider) GetNeighbors ¶
func (p *QueryManagerProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
GetNeighbors returns entity IDs connected to the given entity
type Relationship ¶
type Relationship struct {
Subject string
Predicate string
Object interface{}
FromEntityID string // Source entity
ToEntityID string // Target entity
Weight float64 // For weighted edges
}
Relationship represents an edge between entities. Local copy to avoid import cycle with querymanager.
type RelationshipQuerier ¶
type RelationshipQuerier interface {
EntityQuerier
QueryRelationships(ctx context.Context, entityID string, direction Direction) ([]*Relationship, error)
QueryByPredicate(ctx context.Context, predicate string) ([]string, error)
}
RelationshipQuerier provides minimal interface for querying relationships. This interface exists to avoid import cycle with querymanager package.
type SemanticProvider ¶
type SemanticProvider struct {
// contains filtered or unexported fields
}
SemanticProvider wraps a base Provider and adds virtual edges based on embedding similarity. This enables LPA clustering to find communities even when explicit relationship triples don't exist.
Virtual edges are computed on-demand using cosine similarity between entity embeddings. These edges are NOT persisted - they're ephemeral hints for the clustering algorithm.
Explicit edges (from base provider) always take precedence over virtual edges, and explicit edge weights are preserved as-is (typically confidence-based).
func NewSemanticProvider ¶
func NewSemanticProvider( base Provider, finder SimilarityFinder, config SemanticProviderConfig, logger *slog.Logger, ) *SemanticProvider
NewSemanticProvider creates a Provider that augments explicit edges with virtual edges based on embedding similarity.
Parameters:
- base: The underlying Provider for explicit edges
- finder: SimilarityFinder (typically IndexManager) for similarity lookup
- config: Configuration for similarity threshold and limits
- logger: Optional logger for observability (can be nil)
func (*SemanticProvider) ClearCache ¶
func (p *SemanticProvider) ClearCache()
ClearCache clears the similarity cache and propagates to wrapped providers. Call this between clustering runs to ensure fresh similarity data.
func (*SemanticProvider) GetAllEntityIDs ¶
func (p *SemanticProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)
GetAllEntityIDs delegates to the base provider
func (*SemanticProvider) GetCacheStats ¶
func (p *SemanticProvider) GetCacheStats() (entities int, edges int)
GetCacheStats returns statistics about the similarity cache for monitoring.
func (*SemanticProvider) GetEdgeWeight ¶
GetEdgeWeight returns the weight of an edge between two entities.
For explicit edges: delegates to base provider (uses Triple.Confidence) For virtual edges: returns cached similarity score
Explicit edges always take precedence - if base returns weight > 0, that's used directly. Virtual edge weight is only used when no explicit edge exists.
func (*SemanticProvider) GetNeighbors ¶
func (p *SemanticProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)
GetNeighbors returns both explicit neighbors and virtual neighbors from embedding similarity. Virtual neighbors are added when:
- Similarity exceeds threshold (default 0.6)
- Entity has embeddings available
- Not already an explicit neighbor
Direction parameter is respected for explicit edges but ignored for virtual edges (semantic similarity is symmetric).
type SemanticProviderConfig ¶
type SemanticProviderConfig struct {
// SimilarityThreshold is the minimum cosine similarity for virtual edges.
// Higher values = fewer but stronger virtual connections.
// Recommended: 0.6 for clustering (stricter than 0.3 used in search)
SimilarityThreshold float64
// MaxVirtualNeighbors limits virtual neighbors per entity to control
// computation cost during LPA iterations.
// Recommended: 5
MaxVirtualNeighbors int
}
SemanticProviderConfig holds configuration for SemanticProvider
func DefaultSemanticProviderConfig ¶
func DefaultSemanticProviderConfig() SemanticProviderConfig
DefaultSemanticProviderConfig returns sensible defaults for clustering
type SimilarityFinder ¶
type SimilarityFinder interface {
// FindSimilarEntities returns entities similar to the given entity.
// threshold: minimum cosine similarity (0.0-1.0)
// limit: maximum results to return
FindSimilarEntities(ctx context.Context, entityID string, threshold float64, limit int) ([]gtypes.SimilarityHit, error)
}
SimilarityFinder abstracts the IndexManager's similarity lookup. This interface allows SemanticProvider to work with IndexManager without creating import cycles and enables easier testing.
type StatisticalSummarizer ¶
type StatisticalSummarizer struct {
// MaxKeywords limits the number of keywords extracted
MaxKeywords int
// MaxRepEntities limits the number of representative entities
MaxRepEntities int
}
StatisticalSummarizer implements CommunitySummarizer using statistical methods This is the default summarizer that doesn't require external LLM services
func NewStatisticalSummarizer ¶
func NewStatisticalSummarizer() *StatisticalSummarizer
NewStatisticalSummarizer creates a statistical summarizer with default settings
func (*StatisticalSummarizer) SummarizeCommunity ¶
func (s *StatisticalSummarizer) SummarizeCommunity( ctx context.Context, community *Community, entities []*gtypes.EntityState, ) (*Community, error)
SummarizeCommunity generates a statistical summary of the community