clustering

package
v1.0.0-alpha.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 21 Imported by: 0

README

Clustering Package

Community detection and hierarchical clustering algorithms for the SemStreams entity graph.

Purpose

This package provides graph clustering capabilities using Label Propagation Algorithm (LPA) and PageRank to detect communities of related entities. Detected communities are enriched with statistical summaries and optionally enhanced with LLM-generated descriptions.

Key Types

Community

Represents a detected cluster of related entities in the graph:

type Community struct {
    ID                 string              // Unique community identifier
    Level              int                 // Hierarchy level (0=bottom, 1=mid, 2=top)
    Members            []string            // Entity IDs in this community
    ParentID           *string             // Parent community at next level (nil for top)
    StatisticalSummary string              // Fast baseline summary (always present)
    LLMSummary         string              // Enhanced description (populated async)
    Keywords           []string            // Key terms representing themes
    RepEntities        []string            // Representative entity IDs
    SummaryStatus      string              // "statistical", "llm-enhanced", or "llm-failed"
    Metadata           map[string]interface{} // Additional properties
}

Note on field access: This package uses direct field access, not getter methods. This is idiomatic Go - simply reference community.ID, community.Members, etc.

CommunityDetector

Interface for running community detection on the entity graph:

type CommunityDetector interface {
    DetectCommunities(ctx context.Context) (map[int][]*Community, error)
    UpdateCommunities(ctx context.Context, entityIDs []string) error
    GetCommunity(ctx context.Context, id string) (*Community, error)
    GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)
    GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)
}

Configuration

Configure clustering behavior using builder pattern methods:

detector := clustering.NewLPADetector(provider, storage).
    WithMaxIterations(100).     // Maximum LPA iterations
    WithLevels(3).               // Hierarchy levels (0=bottom, 1=mid, 2=top)
    WithProgressiveSummarization(summarizer, entityProvider)

Available configuration methods:

  • WithMaxIterations(int): Set max iteration count (default: 100, max: 10000)
  • WithLevels(int): Set hierarchy depth (default: 3, max: 10)
  • WithProgressiveSummarization(CommunitySummarizer, EntityProvider): Enable summarization

Usage Example

// Create graph provider for specific entity types
provider := clustering.NewPredicateGraphProvider(queryManager, "robotics.drone")

// Create storage backend
storage := clustering.NewNATSStorage(js, "COMMUNITIES")

// Initialize detector
detector := clustering.NewLPADetector(provider, storage)

// Detect communities
communities, err := detector.DetectCommunities(ctx)
if err != nil {
    return fmt.Errorf("community detection failed: %w", err)
}

// Process results by hierarchy level
for level, comms := range communities {
    log.Printf("Level %d: %d communities", level, len(comms))
    for _, comm := range comms {
        // Direct field access - no getters needed
        log.Printf("  Community %s: %d members", comm.ID, len(comm.Members))
        log.Printf("    Keywords: %v", comm.Keywords)
        log.Printf("    Summary: %s", comm.StatisticalSummary)
    }
}

Integration with QueryManager

The package defines local interfaces to avoid import cycles with querymanager:

type RelationshipQuerier interface {
    EntityQuerier
    QueryRelationships(ctx context.Context, entityID string, direction Direction) ([]*Relationship, error)
    QueryByPredicate(ctx context.Context, predicate string) ([]string, error)
}

Two graph provider implementations are available:

QueryManagerGraphProvider

Direct integration with QueryManager for neighborhood queries:

provider := clustering.NewQueryManagerGraphProvider(queryManager)

Limitation: Does not support full graph scans (GetAllEntityIDs returns error).

PredicateGraphProvider

Recommended for real-world use - clusters entities matching a specific predicate:

// Cluster only drone entities
provider := clustering.NewPredicateGraphProvider(queryManager, "robotics.drone.type")

Caches the entity set at construction for efficient repeated queries.

Algorithms

Label Propagation (LPA)

Detects communities by iteratively propagating labels through the graph until convergence:

  1. Each entity starts with unique label
  2. Iteratively adopt most common neighbor label
  3. Repeat until stable or max iterations reached

Produces bottom-level communities, which are then hierarchically aggregated.

PageRank

Identifies representative entities within each community based on graph centrality. Representatives best exemplify the community's characteristics.

Summarization

Two-tier approach for community descriptions:

  1. Statistical Summary (instant): TF-IDF keyword extraction + template-based generation
  2. LLM Enhancement (async): Optional enrichment via background workers

Check community.SummaryStatus to determine which summary type is available.

Storage

Communities are persisted in NATS KV buckets with configurable retention:

storage := clustering.NewNATSStorage(js, "COMMUNITIES")

Supports incremental updates - only recompute affected communities on graph changes.

Package Location

Previously located at pkg/graphclustering/, this package was moved to processor/graph/clustering/ per ADR-PACKAGE-RESPONSIBILITIES-CONSOLIDATION. The move eliminated import cycles and clarified that clustering is graph processing logic, not a standalone reusable library.

All graph processing capabilities now live under processor/graph/:

  • processor/graph/ - Main processor and mutations
  • processor/graph/querymanager/ - Query execution
  • processor/graph/indexmanager/ - Indexing operations
  • processor/graph/clustering/ - Community detection (this package)
  • processor/graph/embedding/ - Vector embeddings

Documentation

Overview

Package clustering provides community detection algorithms and graph clustering for discovering structural patterns in the knowledge graph.

Overview

The clustering package implements the Label Propagation Algorithm (LPA) for detecting communities of related entities. It supports hierarchical clustering with multiple granularity levels, enabling both fine-grained local communities and coarse-grained global clusters.

Detected communities are enriched with statistical summaries (TF-IDF keyword extraction) immediately, with optional LLM enhancement performed asynchronously. PageRank is used to identify representative entities within each community.

Architecture

                                Graph Provider
                                      ↓
┌──────────────────────────────────────────────────────────────┐
│                      LPA Detector                            │
├──────────────────────────────────────────────────────────────┤
│  Level 0 (fine)  →  Level 1 (mid)  →  Level 2 (coarse)      │
└──────────────────────────────────────────────────────────────┘
                                      ↓
┌────────────────────┐    ┌────────────────────────────────────┐
│ Progressive        │───→│ COMMUNITY_INDEX KV                 │
│ Summarizer         │    │ - {level}.{community_id}           │
│ (statistical)      │    │ - entity.{level}.{entity_id}       │
└────────────────────┘    └────────────────────────────────────┘
                                      ↓
                          ┌───────────────────────┐
                          │ Enhancement Worker    │
                          │ (async LLM via KV     │
                          │  watcher)             │
                          └───────────────────────┘

Usage

Configure and run community detection:

// Create storage backed by NATS KV
storage := clustering.NewNATSCommunityStorage(communityBucket)

// Create LPA detector with progressive summarization
detector := clustering.NewLPADetector(graphProvider, storage).
    WithLevels(3).
    WithMaxIterations(100).
    WithProgressiveSummarization(
        clustering.NewProgressiveSummarizer(),
        entityProvider,
    )

// Run detection (typically after graph updates)
communities, err := detector.DetectCommunities(ctx)
// communities[0] = fine-grained, communities[1] = mid, communities[2] = coarse

Query communities and entities:

// Get community for an entity at specific level
community, err := detector.GetEntityCommunity(ctx, entityID, 0)

// Get all communities at a level
allLevel0, err := detector.GetCommunitiesByLevel(ctx, 0)

// Infer relationships from community co-membership
triples, err := detector.InferRelationshipsFromCommunities(ctx, 0, clustering.DefaultInferenceConfig())

Label Propagation Algorithm

LPA iteratively assigns entities to communities based on neighbor voting:

  1. Each entity starts with its own unique label
  2. Entities adopt the most frequent label among their neighbors (weighted by edge weight)
  3. Process continues until convergence or max iterations reached
  4. Shuffled processing order reduces oscillation

Hierarchical levels are computed by treating communities from level N as super-nodes for level N+1 detection.

Community Summarization

Three summarization strategies are available:

StatisticalSummarizer:

  • TF-IDF-like keyword extraction from entity types and properties
  • PageRank-based representative entity selection
  • Template-based summary generation
  • Always available, no external dependencies

LLMSummarizer:

  • OpenAI-compatible LLM for natural language summaries
  • Works with shimmy, OpenAI, Ollama, vLLM, etc.
  • Falls back to statistical on service unavailability

ProgressiveSummarizer:

  • Statistical summary immediately available
  • LLM enhancement performed asynchronously
  • Best for user-facing applications needing fast initial response

Enhancement Worker

The EnhancementWorker watches COMMUNITY_INDEX KV for communities with status="statistical" and asynchronously generates LLM summaries:

worker, err := clustering.NewEnhancementWorker(&clustering.EnhancementWorkerConfig{
    LLMSummarizer:   llmSummarizer,
    Storage:         storage,
    Provider:        graphProvider,
    Querier:         queryManager,
    CommunityBucket: communityBucket,
})

worker.Start(ctx)
defer worker.Stop()

PageRank

PageRank identifies influential entities within communities:

config := clustering.DefaultPageRankConfig()
config.TopN = 10

result, err := clustering.ComputePageRankForCommunity(ctx, provider, memberIDs, config)
// result.Ranked = top 10 entities by PageRank score
// result.Scores = map of entity ID to normalized score

Default configuration:

  • Iterations: 20
  • DampingFactor: 0.85
  • Tolerance: 1e-6 (convergence threshold)

Configuration

LPA detector configuration:

MaxIterations:    100       # Maximum iterations (limit: 10000)
Levels:           3         # Hierarchical levels (limit: 10)

LLM summary transfer between detection runs:

SummaryTransferThreshold: 0.8    # Jaccard overlap for preserving LLM summaries

Inference configuration for relationship generation:

MinCommunitySize:        2     # Minimum size for inference
MaxInferredPerCommunity: 50    # Limit to prevent O(n²) explosion

PageRank configuration:

Iterations:    20        # Max iterations
DampingFactor: 0.85      # Random walk continuation probability
Tolerance:     1e-6      # Convergence threshold
TopN:          0         # Return all (or limit to top N)

Storage

Communities are stored in the COMMUNITY_INDEX KV bucket:

{level}.{community_id}        → Community JSON
entity.{level}.{entity_id}   → Community ID (for entity lookup)

Storage can optionally create member_of triples:

config := clustering.CommunityStorageConfig{
    CreateTriples:   true,
    TriplePredicate: "graph.community.member_of",
}
storage := clustering.NewNATSCommunityStorageWithConfig(kv, config)

Thread Safety

LPADetector, NATSCommunityStorage, and EnhancementWorker are safe for concurrent use. The enhancement worker supports pause/resume for coordinated graph updates:

worker.Pause()    // Stop processing new communities
// ... perform graph updates ...
worker.Resume()   // Continue processing

Metrics

The clustering package exports Prometheus metrics under the semstreams_clustering namespace:

  • communities_detected_total: Communities detected by level
  • detection_duration_seconds: Detection run duration
  • enhancement_latency_seconds: LLM enhancement duration
  • enhancement_queue_depth: Pending enhancements
  • enhancement_success_total: Successful LLM enhancements
  • enhancement_failed_total: Failed LLM enhancements

See Also

Related packages:

Package clustering provides graph clustering algorithms and community detection.

Package clustering provides community detection algorithms and graph providers.

Package clustering provides community detection algorithms and graph providers.

Index

Constants

View Source
const (
	// DefaultMaxIterations is the default maximum iteration count
	DefaultMaxIterations = 100

	// MaxIterationsLimit is the maximum allowed iteration count
	MaxIterationsLimit = 10000

	// DefaultLevels is the default number of hierarchical levels
	DefaultLevels = 3

	// MaxLevelsLimit is the maximum allowed hierarchical levels
	MaxLevelsLimit = 10

	// SummaryTransferThreshold is the minimum Jaccard overlap for transferring LLM summaries
	// between archived and newly detected communities
	SummaryTransferThreshold = 0.8
)
View Source
const (
	// CommunityBucket is the NATS KV bucket for storing communities
	CommunityBucket = "COMMUNITY_INDEX"

	// MaxCommunityLevels is the maximum hierarchy depth for scanning
	MaxCommunityLevels = 10
)

Variables

This section is empty.

Functions

func ComputeRepresentativeEntities

func ComputeRepresentativeEntities(ctx context.Context, provider Provider, communityMembers []string, topN int) ([]string, map[string]float64, error)

ComputeRepresentativeEntities computes representative entities for a community using PageRank Returns the top N entities by PageRank score

Types

type Community

type Community struct {
	// ID is the unique identifier for this community
	ID string `json:"id"`

	// Level indicates the hierarchy level (0=bottom, 1=mid, 2=top)
	Level int `json:"level"`

	// Members contains the entity IDs belonging to this community
	Members []string `json:"members"`

	// ParentID references the parent community at the next level up (nil for top level)
	ParentID *string `json:"parent_id,omitempty"`

	// StatisticalSummary is the fast statistical baseline summary (always present)
	// Generated using TF-IDF keyword extraction and template-based summarization
	StatisticalSummary string `json:"statistical_summary,omitempty"`

	// LLMSummary is the enhanced LLM-generated summary (populated asynchronously)
	// Empty until LLM enhancement completes successfully
	LLMSummary string `json:"llm_summary,omitempty"`

	// Keywords are extracted key terms representing this community's themes
	// e.g., ["autonomous", "navigation", "sensor-fusion"]
	Keywords []string `json:"keywords,omitempty"`

	// RepEntities contains IDs of representative entities within this community
	// These entities best exemplify the community's characteristics
	RepEntities []string `json:"rep_entities,omitempty"`

	// SummaryStatus tracks the summarization state
	// Values: "statistical" (initial), "llm-enhanced" (enhanced), "llm-failed" (enhancement failed)
	SummaryStatus string `json:"summary_status,omitempty"`

	// Metadata stores additional community properties
	Metadata map[string]interface{} `json:"metadata,omitempty"`
}

Community represents a detected community/cluster in the graph

type CommunityDetector

type CommunityDetector interface {
	// DetectCommunities runs community detection on the entire graph
	// Returns communities organized by hierarchical level
	DetectCommunities(ctx context.Context) (map[int][]*Community, error)

	// UpdateCommunities incrementally updates communities based on recent graph changes
	// entityIDs are entities that have been added/modified since last detection
	UpdateCommunities(ctx context.Context, entityIDs []string) error

	// GetCommunity retrieves a specific community by ID
	GetCommunity(ctx context.Context, id string) (*Community, error)

	// GetEntityCommunity returns the community containing the given entity
	// level specifies which hierarchical level to query (0=bottom, 1=mid, 2=top)
	GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)

	// GetCommunitiesByLevel returns all communities at a specific hierarchical level
	GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)

	// InferRelationshipsFromCommunities generates inferred triples from community co-membership.
	// For each community with >= minCommunitySize members, creates bidirectional
	// "inferred.clustered_with" triples between members.
	InferRelationshipsFromCommunities(ctx context.Context, level int, config InferenceConfig) ([]InferredTriple, error)
}

CommunityDetector performs community detection on a graph

type CommunityStorage

type CommunityStorage interface {
	// SaveCommunity persists a community
	SaveCommunity(ctx context.Context, community *Community) error

	// GetCommunity retrieves a community by ID
	GetCommunity(ctx context.Context, id string) (*Community, error)

	// GetCommunitiesByLevel retrieves all communities at a level
	GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)

	// GetEntityCommunity retrieves the community for an entity at a level
	GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)

	// DeleteCommunity removes a community
	DeleteCommunity(ctx context.Context, id string) error

	// Clear removes all communities (useful for full recomputation)
	Clear(ctx context.Context) error

	// GetAllCommunities returns all communities across all levels
	// Used for archiving enhanced communities before Clear()
	GetAllCommunities(ctx context.Context) ([]*Community, error)
}

CommunityStorage abstracts persistence layer for communities

type CommunityStorageConfig

type CommunityStorageConfig struct {
	// CreateTriples enables creation of member_of triples during SaveCommunity
	CreateTriples bool

	// TriplePredicate specifies the predicate to use for community membership triples
	// Default: "graph.community.member_of"
	TriplePredicate string
}

CommunityStorageConfig configures community storage behavior

type CommunitySummarizer

type CommunitySummarizer interface {
	// SummarizeCommunity generates a summary for a community
	// Returns updated Community with Summary, Keywords, RepEntities, and Summarizer fields populated
	SummarizeCommunity(ctx context.Context, community *Community, entities []*gtypes.EntityState) (*Community, error)
}

CommunitySummarizer generates summaries for communities

type Direction

type Direction string

Direction represents the direction of relationship traversal. Local copy to avoid import cycle with querymanager.

const (
	DirectionOutgoing Direction = "outgoing"
	DirectionIncoming Direction = "incoming"
	DirectionBoth     Direction = "both"
)

Direction constants for relationship traversal.

type EnhancementMetrics

type EnhancementMetrics struct {
	// contains filtered or unexported fields
}

EnhancementMetrics provides Prometheus metrics for LLM community enhancement

func NewEnhancementMetrics

func NewEnhancementMetrics(component string, registry *metric.MetricsRegistry) *EnhancementMetrics

NewEnhancementMetrics creates a new EnhancementMetrics instance using MetricsRegistry

func (*EnhancementMetrics) DecQueueDepth

func (m *EnhancementMetrics) DecQueueDepth()

DecQueueDepth decrements the queue depth by 1

func (*EnhancementMetrics) IncQueueDepth

func (m *EnhancementMetrics) IncQueueDepth()

IncQueueDepth increments the queue depth by 1

func (*EnhancementMetrics) RecordEnhancementFailed

func (m *EnhancementMetrics) RecordEnhancementFailed(latencySeconds float64)

RecordEnhancementFailed records a failed enhancement with latency

func (*EnhancementMetrics) RecordEnhancementStart

func (m *EnhancementMetrics) RecordEnhancementStart()

RecordEnhancementStart records the start of an enhancement attempt

func (*EnhancementMetrics) RecordEnhancementSuccess

func (m *EnhancementMetrics) RecordEnhancementSuccess(latencySeconds float64)

RecordEnhancementSuccess records a successful enhancement with latency

func (*EnhancementMetrics) SetQueueDepth

func (m *EnhancementMetrics) SetQueueDepth(depth int)

SetQueueDepth sets the current queue depth

type EnhancementWorker

type EnhancementWorker struct {
	// contains filtered or unexported fields
}

EnhancementWorker handles asynchronous LLM enhancement of community summaries via KV watch

func NewEnhancementWorker

func NewEnhancementWorker(config *EnhancementWorkerConfig) (*EnhancementWorker, error)

NewEnhancementWorker creates a new enhancement worker

func (*EnhancementWorker) IsPaused

func (w *EnhancementWorker) IsPaused() bool

IsPaused returns whether the worker is currently paused.

func (*EnhancementWorker) Pause

func (w *EnhancementWorker) Pause()

Pause stops processing new communities while allowing in-flight work to complete. Safe to call multiple times. Returns immediately after signaling workers to pause.

func (*EnhancementWorker) Resume

func (w *EnhancementWorker) Resume()

Resume allows processing to continue after a Pause. Safe to call multiple times.

func (*EnhancementWorker) Start

func (w *EnhancementWorker) Start(ctx context.Context) error

Start begins watching for communities needing LLM enhancement

func (*EnhancementWorker) Stop

func (w *EnhancementWorker) Stop() error

Stop gracefully stops the enhancement worker

func (*EnhancementWorker) WithWorkers

func (w *EnhancementWorker) WithWorkers(n int) *EnhancementWorker

WithWorkers sets the number of concurrent workers. Must be called before Start(). Has no effect if worker is already started.

type EnhancementWorkerConfig

type EnhancementWorkerConfig struct {
	LLMSummarizer   *LLMSummarizer
	Storage         CommunityStorage
	Provider        Provider
	Querier         EntityQuerier
	CommunityBucket jetstream.KeyValue
	Logger          *slog.Logger
	Registry        *metric.MetricsRegistry // Optional: for LLM enhancement metrics
}

EnhancementWorkerConfig holds configuration for the enhancement worker

type EntityIDProvider

type EntityIDProvider struct {
	// contains filtered or unexported fields
}

EntityIDProvider wraps a base Provider and adds virtual edges based on EntityID hierarchy. This enables LPA clustering to find communities using the 6-part EntityID structure even when explicit relationship triples don't exist.

EntityID format: org.platform.domain.system.type.instance Entities with the same 5-part TypePrefix (org.platform.domain.system.type) are considered siblings.

Virtual edges are computed on-demand and cached for performance. These edges are NOT persisted - they're ephemeral hints for the clustering algorithm.

Explicit edges (from base provider) always take precedence over virtual edges.

func NewEntityIDProvider

func NewEntityIDProvider(
	base Provider,
	config EntityIDProviderConfig,
	logger *slog.Logger,
) *EntityIDProvider

NewEntityIDProvider creates a Provider that augments explicit edges with virtual edges based on EntityID hierarchy (6-part dotted format).

Parameters:

  • base: The underlying Provider for explicit edges (also used to list all entities)
  • config: Configuration for edge weights and limits
  • logger: Optional logger for observability (can be nil)

func (*EntityIDProvider) ClearCache

func (p *EntityIDProvider) ClearCache()

ClearCache clears the type prefix cache and propagates to wrapped providers. Call this when entities are added/removed.

func (*EntityIDProvider) GetAllEntityIDs

func (p *EntityIDProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)

GetAllEntityIDs delegates to the base provider

func (*EntityIDProvider) GetCacheStats

func (p *EntityIDProvider) GetCacheStats() (prefixes int, entities int)

GetCacheStats returns statistics about the type prefix cache for monitoring.

func (*EntityIDProvider) GetEdgeWeight

func (p *EntityIDProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)

GetEdgeWeight returns the weight of an edge between two entities.

For explicit edges: delegates to base provider For sibling edges: returns configured sibling weight (default 0.7)

Explicit edges always take precedence - if base returns weight > 0, that's used directly. Sibling edge weight is only used when no explicit edge exists.

func (*EntityIDProvider) GetNeighbors

func (p *EntityIDProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)

GetNeighbors returns both explicit neighbors and sibling neighbors from EntityID hierarchy. Sibling neighbors are entities that share the same 5-part type prefix.

Direction parameter is respected for explicit edges but ignored for sibling edges (sibling relationships are symmetric).

func (*EntityIDProvider) GetSiblingEdgeMetrics

func (p *EntityIDProvider) GetSiblingEdgeMetrics() (successes, errors int64)

GetSiblingEdgeMetrics returns metrics for sibling edge operations.

type EntityIDProviderConfig

type EntityIDProviderConfig struct {
	// SiblingWeight is the edge weight for sibling relationships.
	// Higher values = stronger connection influence in LPA.
	// Recommended: 0.7 (lower than explicit edges at 1.0)
	SiblingWeight float64

	// MaxSiblings limits sibling neighbors per entity to control
	// computation cost during LPA iterations.
	// Recommended: 10
	MaxSiblings int

	// IncludeSiblings enables sibling edge discovery.
	// Set to false to disable EntityID-based edges entirely.
	IncludeSiblings bool
}

EntityIDProviderConfig holds configuration for EntityIDProvider

func DefaultEntityIDProviderConfig

func DefaultEntityIDProviderConfig() EntityIDProviderConfig

DefaultEntityIDProviderConfig returns sensible defaults for clustering

type EntityProvider

type EntityProvider interface {
	GetEntities(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
}

EntityProvider interface for fetching full entity states for summarization

type EntityQuerier

type EntityQuerier interface {
	GetEntities(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
}

EntityQuerier provides minimal interface for querying entities. This interface exists to avoid import cycle with querymanager package.

type InferenceConfig

type InferenceConfig struct {
	// MinCommunitySize is the minimum community size for generating inferences
	// Singleton communities (size=1) never produce inferences
	MinCommunitySize int

	// MaxInferredPerCommunity limits inferred relationships per community
	// Prevents O(n²) explosion in large communities
	MaxInferredPerCommunity int
}

InferenceConfig holds configuration for relationship inference

func DefaultInferenceConfig

func DefaultInferenceConfig() InferenceConfig

DefaultInferenceConfig returns sensible defaults for relationship inference

type InferredTriple

type InferredTriple struct {
	Subject     string
	Predicate   string
	Object      string
	Source      string
	Confidence  float64
	Timestamp   time.Time
	CommunityID string // Community that produced this inference
	Level       int    // Hierarchical level
}

InferredTriple represents a relationship inferred from community detection. This is a lightweight struct for returning inference results. The caller converts these to message.Triple for persistence.

type LLMSummarizer

type LLMSummarizer struct {
	// Client is the LLM client for making chat completion requests.
	Client llm.Client

	// FallbackSummarizer is used if LLM service is unavailable.
	FallbackSummarizer *StatisticalSummarizer

	// MaxTokens limits the response length (default: 150).
	MaxTokens int

	// ContentFetcher optionally fetches entity content (title, abstract) for richer prompts.
	// If nil, prompts use only entity IDs and triple-derived keywords.
	ContentFetcher llm.ContentFetcher
}

LLMSummarizer implements CommunitySummarizer using an OpenAI-compatible LLM API. This summarizer calls an external LLM service for higher quality natural language summaries.

It works with any OpenAI-compatible backend:

  • shimmy (recommended for local inference)
  • OpenAI cloud
  • Ollama, vLLM, etc.

func NewLLMSummarizer

func NewLLMSummarizer(cfg LLMSummarizerConfig, opts ...LLMSummarizerOption) (*LLMSummarizer, error)

NewLLMSummarizer creates an LLM-based summarizer with the given configuration. Optional functional options can be provided to configure additional features.

func (*LLMSummarizer) SummarizeCommunity

func (s *LLMSummarizer) SummarizeCommunity(
	ctx context.Context,
	community *Community,
	entities []*gtypes.EntityState,
) (*Community, error)

SummarizeCommunity generates an LLM-based summary of the community. Implements CommunitySummarizer interface with 3-param signature. Content fetching happens internally using the optional ContentFetcher.

type LLMSummarizerConfig

type LLMSummarizerConfig struct {
	// Client is the LLM client (required).
	Client llm.Client

	// MaxTokens limits the response length (default: 150).
	MaxTokens int
}

LLMSummarizerConfig configures the LLM summarizer.

type LLMSummarizerOption

type LLMSummarizerOption func(*LLMSummarizer) error

LLMSummarizerOption configures an LLMSummarizer using the functional options pattern. Options return errors for validation (following natsclient pattern).

func WithContentFetcher

func WithContentFetcher(fetcher llm.ContentFetcher) LLMSummarizerOption

WithContentFetcher sets the ContentFetcher for enriching prompts with entity content. If not set, prompts use only entity IDs and triple-derived keywords.

type LPADetector

type LPADetector struct {
	// contains filtered or unexported fields
}

LPADetector implements community detection using Label Propagation Algorithm

func NewLPADetector

func NewLPADetector(provider Provider, storage CommunityStorage) *LPADetector

NewLPADetector creates a new Label Propagation Algorithm detector

func (*LPADetector) DetectCommunities

func (d *LPADetector) DetectCommunities(ctx context.Context) (map[int][]*Community, error)

DetectCommunities runs full community detection across all hierarchical levels

func (*LPADetector) GetCommunitiesByLevel

func (d *LPADetector) GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)

GetCommunitiesByLevel returns all communities at a level

func (*LPADetector) GetCommunity

func (d *LPADetector) GetCommunity(ctx context.Context, id string) (*Community, error)

GetCommunity retrieves a community by ID

func (*LPADetector) GetEntityCommunity

func (d *LPADetector) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)

GetEntityCommunity returns the community for an entity at a specific level

func (*LPADetector) InferRelationshipsFromCommunities

func (d *LPADetector) InferRelationshipsFromCommunities(
	ctx context.Context,
	level int,
	config InferenceConfig,
) ([]InferredTriple, error)

InferRelationshipsFromCommunities generates inferred triples from community co-membership. For each community with >= minCommunitySize members, this creates bidirectional "inferred.clustered_with" triples between members.

Parameters:

  • level: Hierarchical level to process (0 = most granular)
  • config: Inference configuration (min size, max pairs)

Returns triples suitable for persistence via graph.mutation.triple.add. The caller is responsible for persisting these triples.

Confidence scoring:

  • Base confidence: 0.5 (inferred relationships)
  • Adjusted by community tightness: +0.0 to +0.3 based on internal similarity
  • Final range: 0.5-0.8 for inferred relationships

func (*LPADetector) SetEntityProvider

func (d *LPADetector) SetEntityProvider(provider EntityProvider)

SetEntityProvider sets the entity provider for fetching entities during summarization. This method supports deferred initialization - call after the entity provider becomes available. Both summarizer and entityProvider must be set for summarization to occur.

func (*LPADetector) UpdateCommunities

func (d *LPADetector) UpdateCommunities(ctx context.Context, _ []string) error

UpdateCommunities incrementally updates communities based on changed entities

func (*LPADetector) WithLevels

func (d *LPADetector) WithLevels(levels int) *LPADetector

WithLevels sets the number of hierarchical levels with validation

func (*LPADetector) WithLogger

func (d *LPADetector) WithLogger(logger *slog.Logger) *LPADetector

WithLogger sets the logger for the detector

func (*LPADetector) WithMaxIterations

func (d *LPADetector) WithMaxIterations(max int) *LPADetector

WithMaxIterations sets the maximum iteration count with validation

func (*LPADetector) WithProgressiveSummarization

func (d *LPADetector) WithProgressiveSummarization(
	summarizer CommunitySummarizer,
	entityProvider EntityProvider,
) *LPADetector

WithProgressiveSummarization enables progressive summarization with LLM enhancement summarizer: generates statistical summaries immediately entityProvider: fetches full entity states for summarization Note: EnhancementWorker watches COMMUNITY_INDEX KV for async LLM enhancement (no NATS events needed)

func (*LPADetector) WithSummarizer

func (d *LPADetector) WithSummarizer(summarizer CommunitySummarizer) *LPADetector

WithSummarizer sets the summarizer without requiring an entity provider. Use SetEntityProvider() later to enable summarization once the provider is available. This supports deferred initialization patterns where the entity provider isn't available at detector creation time.

type NATSCommunityStorage

type NATSCommunityStorage struct {
	// contains filtered or unexported fields
}

NATSCommunityStorage implements CommunityStorage using NATS KV

func NewNATSCommunityStorage

func NewNATSCommunityStorage(kv jetstream.KeyValue) *NATSCommunityStorage

NewNATSCommunityStorage creates a new NATS-backed community storage with default configuration (no triple creation)

func NewNATSCommunityStorageWithConfig

func NewNATSCommunityStorageWithConfig(kv jetstream.KeyValue, config CommunityStorageConfig) *NATSCommunityStorage

NewNATSCommunityStorageWithConfig creates a new NATS-backed community storage with custom configuration for triple creation

func (*NATSCommunityStorage) Clear

func (s *NATSCommunityStorage) Clear(ctx context.Context) error

Clear removes all communities and entity mappings. This is a best-effort operation - context cancellation during cleanup is ignored since partial cleanup is acceptable during shutdown.

func (*NATSCommunityStorage) DeleteCommunity

func (s *NATSCommunityStorage) DeleteCommunity(ctx context.Context, id string) error

DeleteCommunity removes a community

func (*NATSCommunityStorage) GetAllCommunities

func (s *NATSCommunityStorage) GetAllCommunities(ctx context.Context) ([]*Community, error)

GetAllCommunities returns all communities across all levels Used by the LPA detector to archive enhanced communities before Clear()

func (*NATSCommunityStorage) GetCommunitiesByLevel

func (s *NATSCommunityStorage) GetCommunitiesByLevel(ctx context.Context, level int) ([]*Community, error)

GetCommunitiesByLevel retrieves all communities at a level

func (*NATSCommunityStorage) GetCommunity

func (s *NATSCommunityStorage) GetCommunity(ctx context.Context, id string) (*Community, error)

GetCommunity retrieves a community by ID. Since community IDs no longer embed the level, this scans all levels to find the community.

func (*NATSCommunityStorage) GetCreatedTriples

func (s *NATSCommunityStorage) GetCreatedTriples() []message.Triple

GetCreatedTriples returns all triples created during SaveCommunity operations This method is primarily for testing and verification purposes

func (*NATSCommunityStorage) GetEntityCommunity

func (s *NATSCommunityStorage) GetEntityCommunity(ctx context.Context, entityID string, level int) (*Community, error)

GetEntityCommunity retrieves the community for an entity at a level

func (*NATSCommunityStorage) SaveCommunity

func (s *NATSCommunityStorage) SaveCommunity(ctx context.Context, community *Community) error

SaveCommunity persists a community to NATS KV

type PageRankConfig

type PageRankConfig struct {
	// Iterations is the number of iterations to run (default: 20)
	Iterations int

	// DampingFactor is the probability of continuing the random walk (default: 0.85)
	DampingFactor float64

	// Tolerance is the convergence threshold (default: 1e-6)
	Tolerance float64

	// TopN is the number of top-ranked nodes to return (0 = all)
	TopN int
}

PageRankConfig holds configuration for PageRank computation

func DefaultPageRankConfig

func DefaultPageRankConfig() PageRankConfig

DefaultPageRankConfig returns the standard PageRank configuration

type PageRankResult

type PageRankResult struct {
	// Scores maps entity ID to PageRank score
	Scores map[string]float64

	// Ranked contains entity IDs sorted by PageRank score (descending)
	Ranked []string

	// Iterations is the actual number of iterations run
	Iterations int

	// Converged indicates whether the algorithm converged before max iterations
	Converged bool
}

PageRankResult holds the results of PageRank computation

func ComputePageRank

func ComputePageRank(ctx context.Context, provider Provider, config PageRankConfig) (*PageRankResult, error)

ComputePageRank computes PageRank scores for all nodes in the graph

func ComputePageRankForCommunity

func ComputePageRankForCommunity(ctx context.Context, provider Provider, communityMembers []string, config PageRankConfig) (*PageRankResult, error)

ComputePageRankForCommunity computes PageRank for entities within a community This is more efficient than full graph PageRank for large graphs

type PredicateProvider

type PredicateProvider struct {
	// contains filtered or unexported fields
}

PredicateProvider implements Provider for entities matching a predicate This is more practical for real-world use: cluster entities of specific types

func NewPredicateProvider

func NewPredicateProvider(qm RelationshipQuerier, predicate string) *PredicateProvider

NewPredicateProvider creates a Provider for entities matching a predicate It caches the valid entity set at construction time for performance

func (*PredicateProvider) GetAllEntityIDs

func (p *PredicateProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)

GetAllEntityIDs returns all entities matching the predicate

func (*PredicateProvider) GetEdgeWeight

func (p *PredicateProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)

GetEdgeWeight returns the weight of an edge (unweighted: 1.0 or 0.0)

func (*PredicateProvider) GetNeighbors

func (p *PredicateProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)

GetNeighbors returns entity IDs connected to the given entity

type ProgressiveSummarizer

type ProgressiveSummarizer struct {
	// contains filtered or unexported fields
}

ProgressiveSummarizer provides progressive enhancement - statistical summary immediately, LLM enhancement asynchronously via events

func NewProgressiveSummarizer

func NewProgressiveSummarizer() *ProgressiveSummarizer

NewProgressiveSummarizer creates a progressive summarizer with default settings

func (*ProgressiveSummarizer) SummarizeCommunity

func (s *ProgressiveSummarizer) SummarizeCommunity(
	ctx context.Context,
	community *Community,
	entities []*gtypes.EntityState,
) (*Community, error)

SummarizeCommunity generates an immediate statistical summary Caller is responsible for saving and publishing community.detected event for async LLM enhancement

type Provider

type Provider = gtypes.Provider

Provider is an alias to the shared interface in graph package. Abstracts the graph data source for community detection.

type QueryManagerProvider

type QueryManagerProvider struct {
	// contains filtered or unexported fields
}

QueryManagerProvider implements Provider using QueryManager

func NewQueryManagerProvider

func NewQueryManagerProvider(qm RelationshipQuerier) *QueryManagerProvider

NewQueryManagerProvider creates a Provider backed by QueryManager

func (*QueryManagerProvider) GetAllEntityIDs

func (p *QueryManagerProvider) GetAllEntityIDs(_ context.Context) ([]string, error)

GetAllEntityIDs returns all entity IDs in the graph

func (*QueryManagerProvider) GetEdgeWeight

func (p *QueryManagerProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)

GetEdgeWeight returns the weight of an edge between two entities

func (*QueryManagerProvider) GetNeighbors

func (p *QueryManagerProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)

GetNeighbors returns entity IDs connected to the given entity

type Relationship

type Relationship struct {
	Subject      string
	Predicate    string
	Object       interface{}
	FromEntityID string  // Source entity
	ToEntityID   string  // Target entity
	Weight       float64 // For weighted edges
}

Relationship represents an edge between entities. Local copy to avoid import cycle with querymanager.

type RelationshipQuerier

type RelationshipQuerier interface {
	EntityQuerier
	QueryRelationships(ctx context.Context, entityID string, direction Direction) ([]*Relationship, error)
	QueryByPredicate(ctx context.Context, predicate string) ([]string, error)
}

RelationshipQuerier provides minimal interface for querying relationships. This interface exists to avoid import cycle with querymanager package.

type SemanticProvider

type SemanticProvider struct {
	// contains filtered or unexported fields
}

SemanticProvider wraps a base Provider and adds virtual edges based on embedding similarity. This enables LPA clustering to find communities even when explicit relationship triples don't exist.

Virtual edges are computed on-demand using cosine similarity between entity embeddings. These edges are NOT persisted - they're ephemeral hints for the clustering algorithm.

Explicit edges (from base provider) always take precedence over virtual edges, and explicit edge weights are preserved as-is (typically confidence-based).

func NewSemanticProvider

func NewSemanticProvider(
	base Provider,
	finder SimilarityFinder,
	config SemanticProviderConfig,
	logger *slog.Logger,
) *SemanticProvider

NewSemanticProvider creates a Provider that augments explicit edges with virtual edges based on embedding similarity.

Parameters:

  • base: The underlying Provider for explicit edges
  • finder: SimilarityFinder (typically IndexManager) for similarity lookup
  • config: Configuration for similarity threshold and limits
  • logger: Optional logger for observability (can be nil)

func (*SemanticProvider) ClearCache

func (p *SemanticProvider) ClearCache()

ClearCache clears the similarity cache and propagates to wrapped providers. Call this between clustering runs to ensure fresh similarity data.

func (*SemanticProvider) GetAllEntityIDs

func (p *SemanticProvider) GetAllEntityIDs(ctx context.Context) ([]string, error)

GetAllEntityIDs delegates to the base provider

func (*SemanticProvider) GetCacheStats

func (p *SemanticProvider) GetCacheStats() (entities int, edges int)

GetCacheStats returns statistics about the similarity cache for monitoring.

func (*SemanticProvider) GetEdgeWeight

func (p *SemanticProvider) GetEdgeWeight(ctx context.Context, fromID, toID string) (float64, error)

GetEdgeWeight returns the weight of an edge between two entities.

For explicit edges: delegates to base provider (uses Triple.Confidence) For virtual edges: returns cached similarity score

Explicit edges always take precedence - if base returns weight > 0, that's used directly. Virtual edge weight is only used when no explicit edge exists.

func (*SemanticProvider) GetNeighbors

func (p *SemanticProvider) GetNeighbors(ctx context.Context, entityID string, direction string) ([]string, error)

GetNeighbors returns both explicit neighbors and virtual neighbors from embedding similarity. Virtual neighbors are added when:

  • Similarity exceeds threshold (default 0.6)
  • Entity has embeddings available
  • Not already an explicit neighbor

Direction parameter is respected for explicit edges but ignored for virtual edges (semantic similarity is symmetric).

type SemanticProviderConfig

type SemanticProviderConfig struct {
	// SimilarityThreshold is the minimum cosine similarity for virtual edges.
	// Higher values = fewer but stronger virtual connections.
	// Recommended: 0.6 for clustering (stricter than 0.3 used in search)
	SimilarityThreshold float64

	// MaxVirtualNeighbors limits virtual neighbors per entity to control
	// computation cost during LPA iterations.
	// Recommended: 5
	MaxVirtualNeighbors int
}

SemanticProviderConfig holds configuration for SemanticProvider

func DefaultSemanticProviderConfig

func DefaultSemanticProviderConfig() SemanticProviderConfig

DefaultSemanticProviderConfig returns sensible defaults for clustering

type SimilarityFinder

type SimilarityFinder interface {
	// FindSimilarEntities returns entities similar to the given entity.
	// threshold: minimum cosine similarity (0.0-1.0)
	// limit: maximum results to return
	FindSimilarEntities(ctx context.Context, entityID string, threshold float64, limit int) ([]gtypes.SimilarityHit, error)
}

SimilarityFinder abstracts the IndexManager's similarity lookup. This interface allows SemanticProvider to work with IndexManager without creating import cycles and enables easier testing.

type StatisticalSummarizer

type StatisticalSummarizer struct {
	// MaxKeywords limits the number of keywords extracted
	MaxKeywords int

	// MaxRepEntities limits the number of representative entities
	MaxRepEntities int
}

StatisticalSummarizer implements CommunitySummarizer using statistical methods This is the default summarizer that doesn't require external LLM services

func NewStatisticalSummarizer

func NewStatisticalSummarizer() *StatisticalSummarizer

NewStatisticalSummarizer creates a statistical summarizer with default settings

func (*StatisticalSummarizer) SummarizeCommunity

func (s *StatisticalSummarizer) SummarizeCommunity(
	ctx context.Context,
	community *Community,
	entities []*gtypes.EntityState,
) (*Community, error)

SummarizeCommunity generates a statistical summary of the community

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL