Documentation
¶
Overview ¶
Package structural provides structural graph indexing algorithms for query optimization and inference detection.
Overview ¶
The structural package implements two complementary indexing strategies that enable efficient graph queries and anomaly detection:
- K-core decomposition: identifies the dense backbone of the graph
- Pivot-based distance indexing: enables O(1) distance estimation
These indices support query optimization (filtering noise, pruning traversals) and structural inference (detecting anomalies like semantic-structural gaps).
Architecture ¶
Graph Provider
↓
┌────────────────────────────────────────────────────────────────┐
│ Structural Computers │
├───────────────────────────┬────────────────────────────────────┤
│ KCoreComputer │ PivotComputer │
│ (peeling algorithm) │ (PageRank + BFS) │
└───────────────────────────┴────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ STRUCTURAL_INDEX KV │
│ structural.kcore._meta structural.pivot._meta │
│ structural.kcore.entity.{id} structural.pivot.entity.{id} │
│ structural.kcore.bucket.{k} │
└────────────────────────────────────────────────────────────────┘
K-Core Decomposition ¶
K-core decomposition identifies nested subgraphs where each node has at least k neighbors within the same subgraph. Higher core numbers indicate more central, densely connected nodes.
Use cases:
- Filter noise: exclude low-core (peripheral) nodes from search results
- Hub detection: high-core nodes form the graph backbone
- Anomaly detection: core demotion signals structural changes
Algorithm (peeling):
- Initialize degree for all vertices
- Repeatedly remove the vertex with minimum degree
- The core number of a vertex is its degree when removed
Time complexity: O(V + E)
Usage:
computer := structural.NewKCoreComputer(provider, logger) index, err := computer.Compute(ctx) // Filter results to include only hub entities filtered := index.FilterByMinCore(entityIDs, 3) // Get all entities in core level 5+ hubs := index.GetEntitiesAboveCore(5)
Pivot-Based Distance Indexing ¶
The pivot index pre-computes shortest path distances from every node to a small set of "pivot" nodes selected by PageRank centrality. Distance between any two nodes can then be bounded using the triangle inequality:
lower_bound = max |d(A,pivot) - d(B,pivot)| over all pivots upper_bound = min d(A,pivot) + d(B,pivot) over all pivots
Use cases:
- Multi-hop filtering: quickly determine if two entities are within N hops
- PathRAG optimization: prune candidates before expensive traversal
- Semantic-structural gap detection: find semantically similar but structurally distant pairs
Algorithm:
- Select k pivot nodes using PageRank (central nodes make better pivots)
- Run BFS from each pivot to compute distances to all reachable nodes
- Store distance vectors for each node
Time complexity: O(k × (V + E)) where k = pivot count
Usage:
computer := structural.NewPivotComputer(provider, 16, logger)
index, err := computer.Compute(ctx)
// Estimate distance between two entities
lower, upper := index.EstimateDistance(entityA, entityB)
// Quick check if within N hops
if index.IsWithinHops(entityA, entityB, 3) {
// Proceed with expensive traversal
}
// Get candidates potentially within N hops
candidates := index.GetReachableCandidates(source, maxHops)
Storage ¶
Indices are stored in the STRUCTURAL_INDEX NATS KV bucket with the following key patterns:
structural.kcore._meta → KCore metadata (max_core, computed_at)
structural.kcore.entity.{id} → Per-entity core number
structural.kcore.bucket.{k} → Entity IDs at core level k
structural.pivot._meta → Pivot metadata (pivots list, computed_at)
structural.pivot.entity.{id} → Per-entity distance vector
Usage:
storage := structural.NewNATSStructuralIndexStorage(kvBucket) // Save indices err := storage.SaveKCoreIndex(ctx, kcoreIndex) err := storage.SavePivotIndex(ctx, pivotIndex) // Load indices kcoreIndex, err := storage.GetKCoreIndex(ctx) pivotIndex, err := storage.GetPivotIndex(ctx)
Configuration ¶
Default parameters:
MaxHopDistance: 255 # Sentinel for unreachable nodes DefaultPivotCount: 16 # Landmark nodes (10-20 is typical) DefaultPageRankIterations: 20 # PageRank convergence iterations DefaultPageRankDamping: 0.85 # Random walk continuation probability
Thread Safety ¶
KCoreComputer, PivotComputer, and NATSStructuralIndexStorage are safe for concurrent use. The index types (KCoreIndex, PivotIndex) are safe for concurrent reads but not concurrent writes.
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: Provider interface and graph types
- github.com/c360studio/semstreams/graph/inference: Anomaly detection using structural indices
- github.com/c360studio/semstreams/graph/clustering: Community detection (triggers recomputation)
Package structural provides structural graph indexing algorithms for query optimization and inference detection.
The package implements two complementary indexing strategies:
- K-core decomposition: identifies the dense backbone of the graph
- Pivot-based distance indexing: enables O(1) distance estimation
These indices support both query optimization (filtering noise, pruning traversals) and structural inference (detecting anomalies like semantic-structural gaps).
Index ¶
- Constants
- type Indices
- type KCoreComputer
- type KCoreIndex
- type NATSStructuralIndexStorage
- func (s *NATSStructuralIndexStorage) Clear(ctx context.Context) error
- func (s *NATSStructuralIndexStorage) GetKCoreIndex(ctx context.Context) (*KCoreIndex, error)
- func (s *NATSStructuralIndexStorage) GetPivotIndex(ctx context.Context) (*PivotIndex, error)
- func (s *NATSStructuralIndexStorage) SaveKCoreIndex(ctx context.Context, index *KCoreIndex) error
- func (s *NATSStructuralIndexStorage) SavePivotIndex(ctx context.Context, index *PivotIndex) error
- type PivotComputer
- type PivotIndex
- type Provider
- type Storage
Constants ¶
const ( // MaxHopDistance is the sentinel value indicating unreachable nodes // in pivot distance vectors. Using 255 allows single-byte storage. MaxHopDistance = 255 // DefaultPivotCount is the default number of landmark nodes for distance indexing. // Values between 10-20 typically provide good accuracy/storage tradeoff. DefaultPivotCount = 16 // DefaultPageRankIterations is the number of iterations for PageRank pivot selection. DefaultPageRankIterations = 20 // DefaultPageRankDamping is the damping factor for PageRank (probability of following links). DefaultPageRankDamping = 0.85 )
const (
// StructuralIndexBucket is the NATS KV bucket for storing structural indices.
StructuralIndexBucket = "STRUCTURAL_INDEX"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Indices ¶
type Indices struct {
KCore *KCoreIndex `json:"kcore,omitempty"`
Pivot *PivotIndex `json:"pivot,omitempty"`
}
Indices bundles both indices together for convenience.
type KCoreComputer ¶
type KCoreComputer struct {
// contains filtered or unexported fields
}
KCoreComputer computes k-core decomposition of a graph.
K-core decomposition uses the "peeling" algorithm:
- Initialize degree for all vertices
- Repeatedly remove the vertex with minimum degree
- The core number of a vertex is its degree when removed
Time complexity: O(V + E) Space complexity: O(V)
func NewKCoreComputer ¶
func NewKCoreComputer(provider Provider, logger *slog.Logger) *KCoreComputer
NewKCoreComputer creates a new k-core computer.
func (*KCoreComputer) Compute ¶
func (c *KCoreComputer) Compute(ctx context.Context) (*KCoreIndex, error)
Compute performs k-core decomposition on the graph. Returns a KCoreIndex with core numbers for all entities.
func (*KCoreComputer) ComputeIncremental ¶
func (c *KCoreComputer) ComputeIncremental(ctx context.Context, _ []string) (*KCoreIndex, error)
ComputeIncremental updates k-core index for changed entities. For now, this performs a full recomputation. True incremental k-core algorithms exist but are complex and may not be worth the effort given typical graph sizes and computation frequency.
type KCoreIndex ¶
type KCoreIndex struct {
// CoreNumbers maps entity ID to its core number.
// Core number k means the entity has at least k neighbors also in core k or higher.
CoreNumbers map[string]int `json:"core_numbers"`
// MaxCore is the highest core number found in the graph (the innermost core).
MaxCore int `json:"max_core"`
// CoreBuckets groups entities by core number for efficient filtering.
// Key: core number, Value: slice of entity IDs in that core.
CoreBuckets map[int][]string `json:"core_buckets"`
// ComputedAt records when the index was computed.
ComputedAt time.Time `json:"computed_at"`
// EntityCount is the total number of entities in the index.
EntityCount int `json:"entity_count"`
}
KCoreIndex stores k-core decomposition results.
K-core decomposition identifies nested subgraphs where each node has at least k neighbors within the same subgraph. Higher core numbers indicate more central, densely connected nodes.
Use cases:
- Filter noise: exclude low-core (peripheral) nodes from search results
- Hub detection: high-core nodes are graph backbone/hubs
- Anomaly detection: core demotion signals structural changes
func (*KCoreIndex) FilterByMinCore ¶
func (idx *KCoreIndex) FilterByMinCore(entityIDs []string, minCore int) []string
FilterByMinCore returns only entities with core number >= minCore. Useful for excluding peripheral/leaf nodes from query results. Returns nil if the index is nil, or the original slice if minCore <= 0.
func (*KCoreIndex) GetCore ¶
func (idx *KCoreIndex) GetCore(entityID string) int
GetCore returns the core number for an entity. Returns 0 if the entity is not in the index.
func (*KCoreIndex) GetEntitiesAboveCore ¶
func (idx *KCoreIndex) GetEntitiesAboveCore(minCore int) []string
GetEntitiesAboveCore returns all entities with core number >= minCore.
func (*KCoreIndex) GetEntitiesInCore ¶
func (idx *KCoreIndex) GetEntitiesInCore(core int) []string
GetEntitiesInCore returns all entities with exactly the specified core number.
type NATSStructuralIndexStorage ¶
type NATSStructuralIndexStorage struct {
// contains filtered or unexported fields
}
NATSStructuralIndexStorage implements StructuralIndexStorage using NATS KV.
func NewNATSStructuralIndexStorage ¶
func NewNATSStructuralIndexStorage(kv jetstream.KeyValue) *NATSStructuralIndexStorage
NewNATSStructuralIndexStorage creates a new NATS-backed structural index storage.
func (*NATSStructuralIndexStorage) Clear ¶
func (s *NATSStructuralIndexStorage) Clear(ctx context.Context) error
Clear removes all structural index data from NATS KV.
func (*NATSStructuralIndexStorage) GetKCoreIndex ¶
func (s *NATSStructuralIndexStorage) GetKCoreIndex(ctx context.Context) (*KCoreIndex, error)
GetKCoreIndex retrieves the k-core index from NATS KV.
func (*NATSStructuralIndexStorage) GetPivotIndex ¶
func (s *NATSStructuralIndexStorage) GetPivotIndex(ctx context.Context) (*PivotIndex, error)
GetPivotIndex retrieves the pivot index from NATS KV.
func (*NATSStructuralIndexStorage) SaveKCoreIndex ¶
func (s *NATSStructuralIndexStorage) SaveKCoreIndex(ctx context.Context, index *KCoreIndex) error
SaveKCoreIndex persists the k-core index to NATS KV. Stores metadata separately from per-entity core numbers for efficient lookups.
func (*NATSStructuralIndexStorage) SavePivotIndex ¶
func (s *NATSStructuralIndexStorage) SavePivotIndex(ctx context.Context, index *PivotIndex) error
SavePivotIndex persists the pivot index to NATS KV.
type PivotComputer ¶
type PivotComputer struct {
// contains filtered or unexported fields
}
PivotComputer computes pivot-based distance index for a graph.
The algorithm:
- Select k pivot nodes using PageRank (central nodes make better pivots)
- Run BFS from each pivot to compute distances to all reachable nodes
- Store distance vectors for each node
Distance between any two nodes can then be bounded using triangle inequality:
lower = max |d(A,pivot) - d(B,pivot)| over all pivots upper = min d(A,pivot) + d(B,pivot) over all pivots
Time complexity: O(k * (V + E)) where k = pivot count Space complexity: O(V * k)
func NewPivotComputer ¶
func NewPivotComputer(provider Provider, pivotCount int, logger *slog.Logger) *PivotComputer
NewPivotComputer creates a new pivot computer.
func (*PivotComputer) Compute ¶
func (c *PivotComputer) Compute(ctx context.Context) (*PivotIndex, error)
Compute builds the pivot index for the graph.
func (*PivotComputer) ComputeIncremental ¶
func (c *PivotComputer) ComputeIncremental(ctx context.Context, _ []string) (*PivotIndex, error)
ComputeIncremental updates pivot index for changed entities. For now, this performs a full recomputation.
type PivotIndex ¶
type PivotIndex struct {
// Pivots is the ordered list of pivot entity IDs (selected by PageRank).
// The order is significant: DistanceVectors[entity][i] corresponds to Pivots[i].
Pivots []string `json:"pivots"`
// DistanceVectors maps entity ID to its distance vector.
// Vector[i] = shortest path distance to Pivots[i].
// Value of MaxHopDistance (255) indicates unreachable.
DistanceVectors map[string][]int `json:"distance_vectors"`
// ComputedAt records when the index was computed.
ComputedAt time.Time `json:"computed_at"`
// EntityCount is the total number of entities in the index.
EntityCount int `json:"entity_count"`
}
PivotIndex stores pivot-based distance vectors for O(1) distance estimation.
The index pre-computes shortest path distances from every node to a small set of "pivot" nodes (selected by PageRank centrality). Distance between any two nodes can then be bounded using the triangle inequality:
lower_bound = max |d(A,pivot) - d(B,pivot)| over all pivots upper_bound = min d(A,pivot) + d(B,pivot) over all pivots
Use cases:
- Multi-hop filtering: quickly determine if two entities are within N hops
- PathRAG optimization: prune candidates before expensive traversal
- Semantic-structural gap detection: find semantically similar but structurally distant pairs
func (*PivotIndex) EstimateDistance ¶
func (idx *PivotIndex) EstimateDistance(entityA, entityB string) (lower, upper int)
EstimateDistance returns lower and upper bounds for the shortest path distance between two entities using the triangle inequality.
Returns (MaxHopDistance, MaxHopDistance) if either entity is not in the index or if the entities are in disconnected components (no shared reachable pivots).
func (*PivotIndex) GetReachableCandidates ¶
func (idx *PivotIndex) GetReachableCandidates(source string, maxHops int) []string
GetReachableCandidates returns entity IDs that might be within maxHops of source. Uses lower bound filtering - excludes entities definitely too far away. May include some entities that are actually farther (false positives are acceptable, false negatives are not). Entities in disconnected components are excluded.
func (*PivotIndex) IsWithinHops ¶
func (idx *PivotIndex) IsWithinHops(entityA, entityB string, maxHops int) bool
IsWithinHops returns true if the two entities are estimated to be within maxHops. Uses the lower bound from triangle inequality - if lower > maxHops, definitely not within range. If lower <= maxHops but upper > maxHops, result is uncertain (returns true to be conservative).
type Provider ¶
Provider is an alias to the shared interface in graph package. Abstracts the graph data source for structural index computation.
type Storage ¶
type Storage interface {
// SaveKCoreIndex persists the k-core index.
SaveKCoreIndex(ctx context.Context, index *KCoreIndex) error
// GetKCoreIndex retrieves the k-core index.
GetKCoreIndex(ctx context.Context) (*KCoreIndex, error)
// SavePivotIndex persists the pivot index.
SavePivotIndex(ctx context.Context, index *PivotIndex) error
// GetPivotIndex retrieves the pivot index.
GetPivotIndex(ctx context.Context) (*PivotIndex, error)
// Clear removes all structural index data.
Clear(ctx context.Context) error
}
Storage defines the interface for persisting structural indices.