Documentation
¶
Overview ¶
Package graphclustering provides anomaly detection integration for graph-clustering.
Package graphclustering provides the graph-clustering component for community detection.
Package graphclustering provides the graph-clustering component for community detection, structural analysis, and anomaly detection.
Overview ¶
The graph-clustering component performs community detection on the entity graph using Label Propagation Algorithm (LPA), computes structural indices (k-core, pivot distances), and detects anomalies within community contexts. Optionally enhances community descriptions using LLM.
Tier ¶
Tier: STATISTICAL (Tier 1) without LLM, SEMANTIC (Tier 2) with LLM enhancement. Not used in Structural (Tier 0) deployments.
Architecture ¶
graph-clustering is a Tier 1+ component. It watches ENTITY_STATES for change events and triggers community detection based on configurable thresholds.
┌───────────────────┐
ENTITY_STATES ─────►│ │
(KV watch) │ graph-clustering ├──► COMMUNITY_INDEX (KV)
│ ├──► STRUCTURAL_INDEX (KV)
│ ├──► ANOMALY_INDEX (KV)
└─────────┬─────────┘
│ (reads)
┌───────────────┼───────────────┐
▼ ▼ ▼
OUTGOING_INDEX INCOMING_INDEX graph-embedding
(query path)
Features ¶
- Label Propagation Algorithm (LPA) for community detection
- Configurable detection interval and batch thresholds
- Optional LLM-based community summarization
- Structural index computation (k-core decomposition, pivot distances)
- Anomaly detection within community contexts
- Semantic gap detection via graph-embedding query path
Configuration ¶
The component is configured via JSON with the following structure:
{
"ports": {
"inputs": [
{"name": "entity_watch", "subject": "ENTITY_STATES", "type": "kv-watch"}
],
"outputs": [
{"name": "communities", "subject": "COMMUNITY_INDEX", "type": "kv"},
{"name": "structural", "subject": "STRUCTURAL_INDEX", "type": "kv"},
{"name": "anomalies", "subject": "ANOMALY_INDEX", "type": "kv"}
]
},
"detection_interval": "30s",
"batch_size": 100,
"enable_llm": true,
"llm_endpoint": "http://seminstruct:8083/v1",
"min_community_size": 2,
"max_iterations": 100,
"enable_structural": true,
"pivot_count": 16,
"max_hop_distance": 10,
"enable_anomaly_detection": true,
"anomaly_config": {
"enabled": true,
"core_anomaly": {"enabled": true, "min_core_level": 2},
"semantic_gap": {"enabled": true, "similarity_threshold": 0.7},
"virtual_edges": {
"auto_apply": {"enabled": false, "min_confidence": 0.95},
"review_queue": {"enabled": false, "min_confidence": 0.7, "max_confidence": 0.95}
}
}
}
Detection Cycle ¶
When triggered, the component runs through these phases:
- Community Detection (LPA) → COMMUNITY_INDEX
- Structural Computation (if enabled) → STRUCTURAL_INDEX
- Anomaly Detection (if enabled) → ANOMALY_INDEX
Scheduling ¶
Community detection is triggered by:
- Timer: Runs every detection_interval
- Batch threshold: Runs when batch_size entity changes accumulate
Whichever comes first triggers detection.
Port Definitions ¶
Inputs:
- KV watch: ENTITY_STATES - watches for entity changes to count events
Outputs:
- KV bucket: COMMUNITY_INDEX - stores detected communities
- KV bucket: STRUCTURAL_INDEX - stores k-core levels and pivot distances
- KV bucket: ANOMALY_INDEX - stores detected anomalies
Usage ¶
Register the component with the component registry:
import graphclustering "github.com/c360studio/semstreams/processor/graph-clustering"
func init() {
graphclustering.Register(registry)
}
Dependencies ¶
Upstream (reads during detection):
- graph-ingest: watches ENTITY_STATES for change events
- graph-index: reads OUTGOING_INDEX and INCOMING_INDEX for graph structure
- graph-embedding: queries for similar entities via NATS request/reply
Downstream:
- graph-gateway: reads COMMUNITY_INDEX, STRUCTURAL_INDEX, ANOMALY_INDEX for queries
Package graphclustering query handlers ¶
Package graphclustering provides embedding-based similarity search for anomaly detection.
Package graphclustering provides structural analysis integration for graph-clustering.
Index ¶
- func CreateGraphClustering(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type CommunityRequest
- type CommunityResponse
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type EntityRequest
- type EntityResponse
- type LevelRequest
- type LevelResponse
- type MembersRequest
- type MembersResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateGraphClustering ¶
func CreateGraphClustering(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateGraphClustering is the factory function for creating graph-clustering components
Types ¶
type CommunityRequest ¶
type CommunityRequest struct {
ID string `json:"id"`
}
CommunityRequest is the request format for community query
type CommunityResponse ¶
type CommunityResponse struct {
Community *clustering.Community `json:"community"`
}
CommunityResponse is the response format for community query
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the graph-clustering processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize validates configuration and sets up ports (no I/O)
func (*Component) InputPorts ¶
InputPorts returns input port definitions
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
DetectionIntervalStr string `` /* 132-byte string literal not displayed */
BatchSize int `json:"batch_size" schema:"type:int,description:Event count threshold for triggering detection,category:basic"`
EnableLLM bool `json:"enable_llm" schema:"type:bool,description:Enable LLM-based community summarization,category:advanced"`
LLMEndpoint string `json:"llm_endpoint" schema:"type:string,description:URL for LLM endpoint (required if enable_llm is true),category:advanced"`
LLMModel string `json:"llm_model" schema:"type:string,description:Model name for LLM service (e.g. mistral-7b-instruct),category:advanced"`
EnhancementWorkers int `` /* 133-byte string literal not displayed */
MinCommunitySize int `json:"min_community_size" schema:"type:int,description:Minimum number of entities to form a community,category:advanced"`
MaxIterations int `json:"max_iterations" schema:"type:int,description:Maximum iterations for LPA algorithm,category:advanced"`
// Structural analysis (optional, enables anomaly detection)
EnableStructural bool `` /* 137-byte string literal not displayed */
PivotCount int `json:"pivot_count" schema:"type:int,description:Number of pivot nodes for distance indexing (default 16),category:advanced"`
MaxHopDistance int `json:"max_hop_distance" schema:"type:int,description:Maximum BFS traversal depth (default 10),category:advanced"`
// Anomaly detection (optional, requires EnableStructural)
EnableAnomalyDetection bool `` /* 134-byte string literal not displayed */
AnomalyConfig inference.Config `json:"anomaly_config" schema:"type:object,description:Configuration for anomaly detection,category:advanced"`
// Dependency startup configuration
StartupAttempts int `` /* 130-byte string literal not displayed */
StartupInterval int `` /* 134-byte string literal not displayed */
// contains filtered or unexported fields
}
Config holds configuration for graph-clustering component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a valid default configuration
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults sets default values for configuration
func (*Config) DetectionInterval ¶
DetectionInterval returns the parsed detection interval duration
type EntityRequest ¶
EntityRequest is the request format for entity community query
type EntityResponse ¶
type EntityResponse struct {
EntityID string `json:"entity_id"`
Level int `json:"level"`
Community *clustering.Community `json:"community"`
}
EntityResponse is the response format for entity community query
type LevelRequest ¶
type LevelRequest struct {
Level int `json:"level"`
}
LevelRequest is the request format for level query
type LevelResponse ¶
type LevelResponse struct {
Level int `json:"level"`
Communities []*clustering.Community `json:"communities"`
Count int `json:"count"`
}
LevelResponse is the response format for level query
type MembersRequest ¶
type MembersRequest struct {
CommunityID string `json:"community_id"`
}
MembersRequest is the request format for members query
type MembersResponse ¶
type MembersResponse struct {
CommunityID string `json:"community_id"`
Members []string `json:"members"`
Count int `json:"count"`
}
MembersResponse is the response format for members query