Documentation
¶
Overview ¶
Package inference provides structural anomaly detection for missing relationships.
Package inference provides structural anomaly detection and inference for identifying potential missing relationships in the knowledge graph.
Package inference provides structural anomaly detection for identifying potential missing relationships in the knowledge graph.
Overview ¶
The inference system analyzes the gap between semantic similarity and structural distance to identify entities that should probably be related but aren't explicitly connected. It supports four anomaly detection methods:
- Semantic-Structural Gap: High embedding similarity with high graph distance
- Core Isolation: Hub entities with few same-core peer connections
- Core Demotion: Entities that dropped k-core level between computations
- Transitivity Gap: Missing transitive relationships (A→B→C exists but A-C distant)
Detected anomalies flow through an approval pipeline:
Detectors → Pending → LLM Review → Human Review (optional) → Applied/Rejected
Architecture ¶
Community Detection Trigger
↓
┌──────────────────────────────────────────────┐
│ Anomaly Detectors │
├──────────┬──────────┬──────────┬─────────────┤
│ Semantic │ Core │ Core │Transitivity │
│ Gap │ Isolation│ Demotion │ Gap │
└──────────┴──────────┴──────────┴─────────────┘
↓
ANOMALY_INDEX KV
↓
┌─────────────┐
│ Review Worker│ ←── LLM API
└─────────────┘
↓
┌─────────────┐
│Human Review │ ←── HTTP Handlers
│ Queue │
└─────────────┘
↓
Edge Applier → Graph
Usage ¶
Configure and run the anomaly detector:
cfg := inference.DefaultConfig()
cfg.Enabled = true
cfg.SemanticGap.Enabled = true
cfg.Review.Enabled = true
cfg.Review.LLM = llm.Config{
Provider: "openai",
BaseURL: "https://api.openai.com/v1",
}
detector, err := inference.NewDetector(cfg, deps)
// Run detection (typically triggered after community detection)
anomalies, err := detector.DetectAnomalies(ctx)
Query and manage anomalies via HTTP handlers:
// List pending anomalies
GET /api/v1/anomalies?status=pending
// Approve an anomaly
POST /api/v1/anomalies/{id}/approve
// Reject an anomaly
POST /api/v1/anomalies/{id}/reject
Anomaly Types ¶
Semantic-Structural Gap (AnomalySemanticStructuralGap):
Detected when entities have high embedding similarity (≥0.7) but high structural distance (≥3 hops). Evidence includes similarity score and distance bounds from triangle inequality.
Core Isolation (AnomalyCoreIsolation):
Detected when a high k-core entity has fewer same-core connections than expected. Evidence includes core level, peer count, and connectivity ratio.
Core Demotion (AnomalyCoreDemotion):
Detected when an entity drops k-core level between computations, signaling lost relationships. Evidence includes previous/current levels.
Transitivity Gap (AnomalyTransitivityGap):
Detected when A→B and B→C exist but A-C distance exceeds expected for configured transitive predicates. Evidence includes the chain path.
Configuration ¶
Key configuration options:
Enabled: false # Opt-in feature
RunWithCommunityDetection: true # Trigger after community detection
MaxAnomaliesPerRun: 100 # Prevent runaway detection
SemanticGap:
Enabled: true
MinSemanticSimilarity: 0.7 # Minimum embedding similarity
MinStructuralDistance: 3 # Minimum graph distance (hops)
MaxGapsPerEntity: 5 # Limit per entity
CoreAnomaly:
Enabled: true
MinCoreForHubAnalysis: 3 # Minimum k-core for hub analysis
HubIsolationThreshold: 0.5 # Peer connectivity ratio threshold
TrackCoreDemotions: true
MinDemotionDelta: 2 # Core level drop to flag
Transitivity:
Enabled: true
TransitivePredicates: ["member_of", "part_of", "located_in"]
Review:
Enabled: false # Requires LLM setup
AutoApproveThreshold: 0.9 # Auto-approve confidence
AutoRejectThreshold: 0.3 # Auto-reject confidence
FallbackToHuman: true # Escalate uncertain cases
VirtualEdges:
AutoApply:
Enabled: false # Auto-create edges
MinConfidence: 0.95 # Confidence threshold for auto-apply
PredicateTemplate: "inferred.semantic.{band}"
ReviewQueue:
Enabled: false # Queue uncertain gaps for review
MinConfidence: 0.7 # Lower bound for review queue
MaxConfidence: 0.95 # Upper bound (below auto-apply)
Anomaly Lifecycle ¶
Each anomaly progresses through states defined by AnomalyStatus:
StatusPending → StatusLLMReviewing → StatusLLMApproved/StatusLLMRejected
↓
StatusHumanReview (if uncertain)
↓
StatusApproved/StatusRejected
↓
StatusApplied (edge created)
High-confidence anomalies may skip review with StatusAutoApplied.
Thread Safety ¶
The Detector and Storage types are safe for concurrent use. HTTP handlers use optimistic locking via revision numbers when updating anomaly status.
Metrics ¶
The inference package exports Prometheus metrics:
- anomalies_detected_total: Anomalies detected by type
- anomalies_reviewed_total: Anomalies reviewed by decision
- anomalies_applied_total: Anomalies applied to graph
- detection_duration_seconds: Detection run duration
- review_duration_seconds: LLM review duration
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph/clustering: Community detection that triggers inference
- github.com/c360studio/semstreams/graph/embedding: Embedding similarity for semantic gaps
- github.com/c360studio/semstreams/graph/llm: LLM integration for review pipeline
Package inference provides structural anomaly detection and inference for identifying potential missing relationships in the knowledge graph.
Package inference provides structural anomaly detection for missing relationships.
Package inference provides structural anomaly detection for missing relationships.
Package inference provides structural anomaly detection for missing relationships.
Package inference provides structural anomaly detection for missing relationships. It analyzes the gap between semantic similarity and structural distance to identify potential missing edges in the knowledge graph.
Index ¶
- Constants
- Variables
- type AnomalyStatus
- type AnomalyType
- type AutoApplyConfig
- type CommunityInfo
- type Config
- type CoreAnomalyConfig
- type CoreAnomalyDetector
- type Decision
- type Detector
- type DetectorDependencies
- type DirectRelationshipApplier
- type EntityManager
- type Evidence
- type HTTPHandler
- type HierarchyConfig
- type HierarchyInference
- func (h *HierarchyInference) ClearCache()
- func (h *HierarchyInference) GetCacheStats() int
- func (h *HierarchyInference) GetHierarchyTriples(ctx context.Context, entityID string) ([]message.Triple, error)
- func (h *HierarchyInference) GetMetrics() (containersCreated, edgesCreated, edgesFailed int64)
- func (h *HierarchyInference) OnEntityCreated(ctx context.Context, entityID string) error
- type MutationRelationshipApplier
- type NATSAnomalyStorage
- func (s *NATSAnomalyStorage) Cleanup(ctx context.Context, retention time.Duration) (int, error)
- func (s *NATSAnomalyStorage) Count(ctx context.Context) (map[AnomalyStatus]int, error)
- func (s *NATSAnomalyStorage) Delete(ctx context.Context, id string) error
- func (s *NATSAnomalyStorage) Get(ctx context.Context, id string) (*StructuralAnomaly, error)
- func (s *NATSAnomalyStorage) GetByStatus(ctx context.Context, status AnomalyStatus) ([]*StructuralAnomaly, error)
- func (s *NATSAnomalyStorage) GetByType(ctx context.Context, anomalyType AnomalyType) ([]*StructuralAnomaly, error)
- func (s *NATSAnomalyStorage) GetWithRevision(ctx context.Context, id string) (*StructuralAnomaly, uint64, error)
- func (s *NATSAnomalyStorage) HasEntityAnomaly(ctx context.Context, entityID string, anomalyType AnomalyType) (bool, error)
- func (s *NATSAnomalyStorage) IsDismissedPair(ctx context.Context, entityA, entityB string) (bool, error)
- func (s *NATSAnomalyStorage) MarkPairDismissed(ctx context.Context, entityA, entityB string) error
- func (s *NATSAnomalyStorage) Save(ctx context.Context, anomaly *StructuralAnomaly) error
- func (s *NATSAnomalyStorage) SaveWithRevision(ctx context.Context, anomaly *StructuralAnomaly, revision uint64) error
- func (s *NATSAnomalyStorage) UpdateStatus(ctx context.Context, id string, status AnomalyStatus, reviewedBy, notes string) error
- func (s *NATSAnomalyStorage) Watch(ctx context.Context) (<-chan *StructuralAnomaly, error)
- type NATSRelationshipApplier
- type NoOpApplier
- type Orchestrator
- func (o *Orchestrator) GetRegisteredDetectors() []string
- func (o *Orchestrator) RegisterDetector(detector Detector)
- func (o *Orchestrator) RunDetection(ctx context.Context) (*Result, error)
- func (o *Orchestrator) SetApplier(applier RelationshipApplier)
- func (o *Orchestrator) SetDependencies(deps *DetectorDependencies)
- func (o *Orchestrator) UpdateConfig(config Config) error
- type OrchestratorConfig
- type RelationshipApplier
- type RelationshipInfo
- type RelationshipQuerier
- type RelationshipSuggestion
- type Result
- type ReviewConfig
- type ReviewMetrics
- func (m *ReviewMetrics) DecWorkersActive()
- func (m *ReviewMetrics) IncWorkersActive()
- func (m *ReviewMetrics) RecordApproved(latencySeconds float64)
- func (m *ReviewMetrics) RecordDeferred(latencySeconds float64)
- func (m *ReviewMetrics) RecordFailed(latencySeconds float64)
- func (m *ReviewMetrics) RecordRejected(latencySeconds float64)
- func (m *ReviewMetrics) SetPendingCount(count int)
- type ReviewQueueConfig
- type ReviewRequest
- type ReviewWorker
- type ReviewWorkerConfig
- type SemanticGapConfig
- type SemanticGapDetector
- type SimilarityFinder
- type SimilarityResult
- type StatsResponse
- type Storage
- type StorageConfig
- type StructuralAnomaly
- type TransitivityConfig
- type TransitivityDetector
- type TripleAdder
- type VirtualEdgeConfig
Constants ¶
const (
// DefaultAnomalyBucket is the default NATS KV bucket for storing anomalies.
DefaultAnomalyBucket = "ANOMALY_INDEX"
)
Variables ¶
var ErrConcurrentModification = stderrors.New("concurrent modification detected")
ErrConcurrentModification is returned when an optimistic lock check fails.
Functions ¶
This section is empty.
Types ¶
type AnomalyStatus ¶
type AnomalyStatus string
AnomalyStatus represents the lifecycle state of an anomaly.
const ( // StatusPending indicates the anomaly is awaiting review. StatusPending AnomalyStatus = "pending" // StatusLLMReviewing indicates the anomaly is currently being processed by LLM. StatusLLMReviewing AnomalyStatus = "llm_reviewing" // StatusLLMApproved indicates the LLM approved this anomaly (high confidence). StatusLLMApproved AnomalyStatus = "llm_approved" // StatusLLMRejected indicates the LLM rejected this anomaly (low confidence). StatusLLMRejected AnomalyStatus = "llm_rejected" // StatusHumanReview indicates the anomaly has been escalated for human decision. StatusHumanReview AnomalyStatus = "human_review" // StatusApproved indicates a human approved this anomaly. StatusApproved AnomalyStatus = "approved" // StatusRejected indicates a human rejected this anomaly. StatusRejected AnomalyStatus = "rejected" // StatusApplied indicates the suggested relationship was created in the graph. StatusApplied AnomalyStatus = "applied" // StatusDismissed indicates the anomaly was dismissed and should not be re-detected. // This prevents the same entity pair from being flagged repeatedly. StatusDismissed AnomalyStatus = "dismissed" // StatusAutoApplied indicates the relationship was automatically applied // because it met the auto-apply threshold (high confidence). StatusAutoApplied AnomalyStatus = "auto_applied" )
type AnomalyType ¶
type AnomalyType string
AnomalyType identifies the category of structural anomaly detected.
const ( // AnomalySemanticStructuralGap indicates high semantic similarity with high structural distance. // This suggests entities should probably be related but aren't explicitly connected. AnomalySemanticStructuralGap AnomalyType = "semantic_structural_gap" // AnomalyCoreIsolation indicates a high k-core entity with few same-core peer connections. // This suggests missing hub relationships within the core structure. AnomalyCoreIsolation AnomalyType = "core_isolation" // AnomalyCoreDemotion indicates an entity dropped from its k-core level between computations. // This signals lost relationships or potential data quality issues. AnomalyCoreDemotion AnomalyType = "core_demotion" // AnomalyTransitivityGap indicates A->B and B->C exist but A-C distance exceeds expected. // This suggests a missing transitive relationship for configured predicates. AnomalyTransitivityGap AnomalyType = "transitivity_gap" )
type AutoApplyConfig ¶
type AutoApplyConfig struct {
// Enabled activates automatic edge creation
Enabled bool `json:"enabled"`
// MinConfidence is the minimum confidence score to auto-apply (0.0-1.0)
// Confidence is a composite score: similarity + distance boost + core boost
// Gaps at or above this threshold are automatically converted to edges
MinConfidence float64 `json:"min_confidence"`
// PredicateTemplate is the predicate format for created edges
// Supports {band} placeholder: "inferred.semantic.{band}"
// Band values: "high" (>=0.95), "medium" (>=0.85), "related" (else)
PredicateTemplate string `json:"predicate_template"`
}
AutoApplyConfig configures automatic edge creation for high-confidence semantic gaps
func (*AutoApplyConfig) BuildPredicate ¶
func (c *AutoApplyConfig) BuildPredicate(confidence float64) string
BuildPredicate generates the predicate string from the template based on confidence. Template supports {band} placeholder which is replaced with:
- "high" for confidence >= 0.95
- "medium" for confidence >= 0.85
- "related" for lower confidence scores
func (*AutoApplyConfig) ShouldAutoApply ¶
func (c *AutoApplyConfig) ShouldAutoApply(confidence float64) bool
ShouldAutoApply returns true if the anomaly meets auto-apply criteria based on confidence. Confidence is a composite score (similarity + distance boost + core boost) capped at 1.0.
type CommunityInfo ¶
type CommunityInfo struct {
// ID is the community identifier
ID string
// Members is the list of entity IDs in this community
Members []string
// Level is the hierarchy level (0 = base communities)
Level int
}
CommunityInfo provides community membership information for detectors. This is a simple interface to avoid circular imports with the clustering package.
type Config ¶
type Config struct {
// Enabled activates inference detection
Enabled bool `json:"enabled"`
// RunWithCommunityDetection triggers inference after community detection completes
RunWithCommunityDetection bool `json:"run_with_community_detection"`
// MaxAnomaliesPerRun limits total anomalies detected per run to prevent runaway detection
MaxAnomaliesPerRun int `json:"max_anomalies_per_run"`
// Detector-specific configurations
SemanticGap SemanticGapConfig `json:"semantic_gap"`
CoreAnomaly CoreAnomalyConfig `json:"core_anomaly"`
Transitivity TransitivityConfig `json:"transitivity"`
// Review configuration for LLM-assisted and human review
Review ReviewConfig `json:"review"`
// VirtualEdges configuration for auto-applying high-confidence gaps as edges
VirtualEdges VirtualEdgeConfig `json:"virtual_edges"`
// Storage configuration
Storage StorageConfig `json:"storage"`
// Operation timeouts
DetectionTimeout time.Duration `json:"detection_timeout"`
}
Config configures the structural inference system
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults fills in any zero values with defaults
func (*Config) GetEnabledDetectors ¶
GetEnabledDetectors returns a list of enabled detector names
func (*Config) IsDetectorEnabled ¶
IsDetectorEnabled checks if a specific detector is enabled
type CoreAnomalyConfig ¶
type CoreAnomalyConfig struct {
// Enabled activates core-based anomaly detection (isolation + demotion)
Enabled bool `json:"enabled"`
// MinCoreForHubAnalysis sets minimum k-core level for hub isolation analysis
MinCoreForHubAnalysis int `json:"min_core_for_hub_analysis"`
// HubIsolationThreshold is the peer connectivity ratio below which hub is "isolated"
// Value between 0.0-1.0: ratio of actual same-core connections to expected
HubIsolationThreshold float64 `json:"hub_isolation_threshold"`
// TrackCoreDemotions enables detection of entities that dropped k-core level
TrackCoreDemotions bool `json:"track_core_demotions"`
// MinDemotionDelta is the minimum core level drop to flag (e.g., 2 means core 5->3 flagged)
MinDemotionDelta int `json:"min_demotion_delta"`
}
CoreAnomalyConfig configures the k-core based anomaly detectors
type CoreAnomalyDetector ¶
type CoreAnomalyDetector struct {
// contains filtered or unexported fields
}
CoreAnomalyDetector detects k-core based structural anomalies: - Core isolation: high k-core entities with few same-core peers - Core demotion: entities that dropped k-core level between computations
func NewCoreAnomalyDetector ¶
func NewCoreAnomalyDetector(deps *DetectorDependencies) *CoreAnomalyDetector
NewCoreAnomalyDetector creates a new core anomaly detector.
func (*CoreAnomalyDetector) Configure ¶
func (d *CoreAnomalyDetector) Configure(config interface{}) error
Configure updates the detector configuration.
func (*CoreAnomalyDetector) Detect ¶
func (d *CoreAnomalyDetector) Detect(ctx context.Context) ([]*StructuralAnomaly, error)
Detect finds core isolation and demotion anomalies.
func (*CoreAnomalyDetector) Name ¶
func (d *CoreAnomalyDetector) Name() string
Name returns the detector identifier.
func (*CoreAnomalyDetector) SetDependencies ¶
func (d *CoreAnomalyDetector) SetDependencies(deps *DetectorDependencies)
SetDependencies updates the detector's dependencies.
type Detector ¶
type Detector interface {
// Name returns the detector identifier for logging and metrics.
Name() string
// Detect runs the detection algorithm and returns discovered anomalies.
// The context should be used for cancellation and timeouts.
Detect(ctx context.Context) ([]*StructuralAnomaly, error)
// Configure updates the detector configuration.
// Called during orchestrator initialization and on config updates.
Configure(config interface{}) error
// SetDependencies updates the detector's shared dependencies.
// Called by the orchestrator before running detection.
SetDependencies(deps *DetectorDependencies)
}
Detector defines the interface for anomaly detection algorithms.
type DetectorDependencies ¶
type DetectorDependencies struct {
// StructuralIndices provides access to k-core and pivot indices.
StructuralIndices *structural.Indices
// PreviousKCore is the k-core index from the previous computation (for demotion detection).
PreviousKCore *structural.KCoreIndex
// Communities provides community membership for scoped detection.
// Core isolation is analyzed within each community rather than globally.
Communities []CommunityInfo
// SimilarityFinder provides semantic similarity queries.
SimilarityFinder SimilarityFinder
// RelationshipQuerier provides relationship queries for transitivity detection.
RelationshipQuerier RelationshipQuerier
// AnomalyStorage provides access to persisted anomalies (for dismissed pair checks).
AnomalyStorage Storage
// Logger for detector logging.
Logger *slog.Logger
}
DetectorDependencies provides access to shared resources needed by detectors.
type DirectRelationshipApplier ¶
type DirectRelationshipApplier struct {
// contains filtered or unexported fields
}
DirectRelationshipApplier adds triples directly via DataManager. Used for intra-processor mutations, matching the community inference pattern. This avoids NATS roundtrips for operations within the same processor.
func NewDirectRelationshipApplier ¶
func NewDirectRelationshipApplier(adder TripleAdder, logger *slog.Logger) *DirectRelationshipApplier
NewDirectRelationshipApplier creates an applier that uses DataManager directly.
func (*DirectRelationshipApplier) ApplyRelationship ¶
func (a *DirectRelationshipApplier) ApplyRelationship( ctx context.Context, suggestion *RelationshipSuggestion, ) error
ApplyRelationship adds a triple directly via DataManager.
type EntityManager ¶
type EntityManager interface {
ExistsEntity(ctx context.Context, id string) (bool, error)
CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
// ListWithPrefix returns entity IDs matching a prefix (for sibling discovery)
ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}
EntityManager provides entity existence checks and creation. Typically implemented by datamanager.Manager.
type Evidence ¶
type Evidence struct {
// Semantic-Structural Gap evidence
Similarity float64 `json:"similarity,omitempty"` // Embedding similarity score
StructuralDistance int `json:"structural_distance,omitempty"` // Actual or estimated hop count
DistanceLowerBound int `json:"distance_lower_bound,omitempty"` // Triangle inequality lower bound
DistanceUpperBound int `json:"distance_upper_bound,omitempty"` // Triangle inequality upper bound
// Core Isolation evidence
CoreLevel int `json:"core_level,omitempty"` // Entity's k-core number
PeerCount int `json:"peer_count,omitempty"` // Number of same-core neighbors
ExpectedPeerCount int `json:"expected_peer_count,omitempty"` // Expected based on core level
PeerConnectivity float64 `json:"peer_connectivity,omitempty"` // Ratio of actual/expected peers
CommunityID string `json:"community_id,omitempty"` // Community where isolation detected
// Core Demotion evidence
PreviousCoreLevel int `json:"previous_core_level,omitempty"` // Core level before demotion
CurrentCoreLevel int `json:"current_core_level,omitempty"` // Core level after demotion
LostConnections int `json:"lost_connections,omitempty"` // Number of connections lost
// Transitivity Gap evidence
Predicate string `json:"predicate,omitempty"` // The transitive predicate
ChainPath []string `json:"chain_path,omitempty"` // The A->B->C path entities
ActualDistance int `json:"actual_distance,omitempty"` // Measured A-C distance
ExpectedMaxHops int `json:"expected_max_hops,omitempty"` // Config max for transitivity
}
Evidence contains type-specific proof of an anomaly. Different anomaly types populate different fields.
type HTTPHandler ¶
type HTTPHandler struct {
// contains filtered or unexported fields
}
HTTPHandler provides HTTP endpoints for human review of structural anomalies.
func NewHTTPHandler ¶
func NewHTTPHandler(storage Storage, applier RelationshipApplier, logger *slog.Logger) *HTTPHandler
NewHTTPHandler creates a new HTTP handler for inference endpoints.
func (*HTTPHandler) RegisterHTTPHandlers ¶
func (h *HTTPHandler) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
RegisterHTTPHandlers registers inference endpoints with the given mux.
type HierarchyConfig ¶
type HierarchyConfig struct {
// Enabled activates hierarchy inference on entity creation
Enabled bool `json:"enabled"`
// CreateTypeEdges enables type membership edges (5-part prefix → type container)
// StandardIRI: skos:broader
CreateTypeEdges bool `json:"create_type_edges"`
// CreateSystemEdges enables system membership edges (4-part prefix → system container)
// StandardIRI: skos:broader
CreateSystemEdges bool `json:"create_system_edges"`
// CreateDomainEdges enables domain membership edges (3-part prefix → domain container)
// StandardIRI: skos:broader
CreateDomainEdges bool `json:"create_domain_edges"`
// CreateTypeSiblings enables sibling edges between entities with the same type (5-part prefix)
// When enabled, creates bidirectional hierarchy.type.sibling edges
// Cost: O(N) per new entity where N is existing sibling count
CreateTypeSiblings bool `json:"create_type_siblings"`
}
HierarchyConfig configures hierarchy container inference.
func DefaultHierarchyConfig ¶
func DefaultHierarchyConfig() HierarchyConfig
DefaultHierarchyConfig returns sensible defaults for hierarchy inference.
type HierarchyInference ¶
type HierarchyInference struct {
// contains filtered or unexported fields
}
HierarchyInference creates membership edges to container entities based on the 6-part entity ID structure. It operates synchronously - hierarchy triples are computed and returned before the entity is written to storage.
Entity ID format: org.platform.domain.system.type.instance Example: acme.iot.sensors.hvac.temperature.001
Container entities are auto-created with the following pattern:
- Type container: org.platform.domain.system.type.group (6-part)
- System container: org.platform.domain.system.group.container (6-part)
- Domain container: org.platform.domain.group.container.level (6-part)
Graph distances via containers:
- Same type siblings: 2 hops (entity → type.group ← entity)
- Same system, different type: 4 hops
- Same domain, different system: 6 hops
This is a stateless utility - no lifecycle methods (Start/Stop).
func NewHierarchyInference ¶
func NewHierarchyInference( entityManager EntityManager, tripleAdder TripleAdder, config HierarchyConfig, logger *slog.Logger, ) *HierarchyInference
NewHierarchyInference creates a new hierarchy inference component.
Parameters:
- entityManager: Component for entity existence checks and creation
- tripleAdder: Component that can add triples (for inverse edges on containers)
- config: Configuration for edge creation
- logger: Logger for observability (can be nil)
func (*HierarchyInference) ClearCache ¶
func (h *HierarchyInference) ClearCache()
ClearCache resets the container cache. Call when containers might be deleted.
func (*HierarchyInference) GetCacheStats ¶
func (h *HierarchyInference) GetCacheStats() int
GetCacheStats returns statistics about the container cache.
func (*HierarchyInference) GetHierarchyTriples ¶
func (h *HierarchyInference) GetHierarchyTriples(ctx context.Context, entityID string) ([]message.Triple, error)
GetHierarchyTriples returns hierarchy membership triples for the given entity ID. This method has NO side effects - it only computes triples, it doesn't write them. The caller must include these triples in the entity before writing to storage.
For each enabled level (type, system, domain), it: 1. Computes the container entity ID 2. Auto-creates the container if it doesn't exist (side effect on containers only) 3. Returns a membership triple from entity to container 4. Adds inverse edge to container (container → contains → entity)
Returns empty slice if hierarchy is disabled or entity ID is invalid.
func (*HierarchyInference) GetMetrics ¶
func (h *HierarchyInference) GetMetrics() (containersCreated, edgesCreated, edgesFailed int64)
GetMetrics returns metrics for hierarchy inference operations.
func (*HierarchyInference) OnEntityCreated ¶
func (h *HierarchyInference) OnEntityCreated(ctx context.Context, entityID string) error
OnEntityCreated is the legacy method kept for backwards compatibility. It calls GetHierarchyTriples and adds them using tripleAdder. New code should use GetHierarchyTriples directly to avoid cascading writes.
type MutationRelationshipApplier ¶
type MutationRelationshipApplier struct {
// contains filtered or unexported fields
}
MutationRelationshipApplier sends triple add requests to graph-ingest via NATS request/reply. This is the preferred applier for cross-processor mutations as it goes through the proper ingestion pipeline (graph.mutation.triple.add -> graph-ingest -> indexing).
func NewMutationRelationshipApplier ¶
func NewMutationRelationshipApplier(natsClient *natsclient.Client, logger *slog.Logger) *MutationRelationshipApplier
NewMutationRelationshipApplier creates an applier that uses the graph-ingest mutation API.
func (*MutationRelationshipApplier) ApplyRelationship ¶
func (a *MutationRelationshipApplier) ApplyRelationship( ctx context.Context, suggestion *RelationshipSuggestion, ) error
ApplyRelationship sends an add triple request to graph-ingest.
type NATSAnomalyStorage ¶
type NATSAnomalyStorage struct {
// contains filtered or unexported fields
}
NATSAnomalyStorage implements Storage using NATS KV.
func NewNATSAnomalyStorage ¶
func NewNATSAnomalyStorage(kv jetstream.KeyValue, logger *slog.Logger) *NATSAnomalyStorage
NewNATSAnomalyStorage creates a new NATS-backed anomaly storage.
func (*NATSAnomalyStorage) Count ¶
func (s *NATSAnomalyStorage) Count(ctx context.Context) (map[AnomalyStatus]int, error)
Count returns the total number of anomalies by status.
func (*NATSAnomalyStorage) Delete ¶
func (s *NATSAnomalyStorage) Delete(ctx context.Context, id string) error
Delete removes an anomaly.
func (*NATSAnomalyStorage) Get ¶
func (s *NATSAnomalyStorage) Get(ctx context.Context, id string) (*StructuralAnomaly, error)
Get retrieves an anomaly by ID.
func (*NATSAnomalyStorage) GetByStatus ¶
func (s *NATSAnomalyStorage) GetByStatus(ctx context.Context, status AnomalyStatus) ([]*StructuralAnomaly, error)
GetByStatus retrieves all anomalies with the given status.
func (*NATSAnomalyStorage) GetByType ¶
func (s *NATSAnomalyStorage) GetByType(ctx context.Context, anomalyType AnomalyType) ([]*StructuralAnomaly, error)
GetByType retrieves all anomalies of the given type.
func (*NATSAnomalyStorage) GetWithRevision ¶
func (s *NATSAnomalyStorage) GetWithRevision(ctx context.Context, id string) (*StructuralAnomaly, uint64, error)
GetWithRevision retrieves an anomaly by ID along with its KV revision.
func (*NATSAnomalyStorage) HasEntityAnomaly ¶
func (s *NATSAnomalyStorage) HasEntityAnomaly(ctx context.Context, entityID string, anomalyType AnomalyType) (bool, error)
HasEntityAnomaly checks if an anomaly already exists for an entity+type combination. This prevents re-detecting the same core isolation/demotion across detection runs.
func (*NATSAnomalyStorage) IsDismissedPair ¶
func (s *NATSAnomalyStorage) IsDismissedPair(ctx context.Context, entityA, entityB string) (bool, error)
IsDismissedPair checks if an entity pair is already tracked (any status including pending). This prevents re-detecting the same semantic gap repeatedly across detection runs.
func (*NATSAnomalyStorage) MarkPairDismissed ¶
func (s *NATSAnomalyStorage) MarkPairDismissed(ctx context.Context, entityA, entityB string) error
MarkPairDismissed creates an index entry to prevent future re-detection. Called when an anomaly is dismissed, rejected, or auto-applied.
func (*NATSAnomalyStorage) Save ¶
func (s *NATSAnomalyStorage) Save(ctx context.Context, anomaly *StructuralAnomaly) error
Save persists an anomaly to NATS KV.
func (*NATSAnomalyStorage) SaveWithRevision ¶
func (s *NATSAnomalyStorage) SaveWithRevision(ctx context.Context, anomaly *StructuralAnomaly, revision uint64) error
SaveWithRevision persists an anomaly with optimistic locking. If revision > 0, the save will fail if the KV entry has been modified.
func (*NATSAnomalyStorage) UpdateStatus ¶
func (s *NATSAnomalyStorage) UpdateStatus( ctx context.Context, id string, status AnomalyStatus, reviewedBy, notes string, ) error
UpdateStatus updates an anomaly's status and optional review info.
func (*NATSAnomalyStorage) Watch ¶
func (s *NATSAnomalyStorage) Watch(ctx context.Context) (<-chan *StructuralAnomaly, error)
Watch returns a channel of anomalies as they're created/updated.
type NATSRelationshipApplier ¶
type NATSRelationshipApplier struct {
// contains filtered or unexported fields
}
NATSRelationshipApplier publishes relationship triples to the entity stream. The normal ingestion pipeline then indexes the new relationships.
func NewNATSRelationshipApplier ¶
func NewNATSRelationshipApplier( js jetstream.JetStream, subject string, logger *slog.Logger, ) *NATSRelationshipApplier
NewNATSRelationshipApplier creates a new applier that publishes to NATS.
func (*NATSRelationshipApplier) ApplyRelationship ¶
func (a *NATSRelationshipApplier) ApplyRelationship( ctx context.Context, suggestion *RelationshipSuggestion, ) error
ApplyRelationship publishes a new relationship triple to the entity stream.
type NoOpApplier ¶
type NoOpApplier struct {
// contains filtered or unexported fields
}
NoOpApplier is a no-op implementation for testing or disabled mode.
func NewNoOpApplier ¶
func NewNoOpApplier(logger *slog.Logger) *NoOpApplier
NewNoOpApplier creates an applier that logs but doesn't persist.
func (*NoOpApplier) ApplyRelationship ¶
func (a *NoOpApplier) ApplyRelationship( _ context.Context, suggestion *RelationshipSuggestion, ) error
ApplyRelationship logs the suggestion but doesn't persist it.
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator coordinates running all enabled detectors.
func NewOrchestrator ¶
func NewOrchestrator(cfg OrchestratorConfig) (*Orchestrator, error)
NewOrchestrator creates a new detector orchestrator.
func (*Orchestrator) GetRegisteredDetectors ¶
func (o *Orchestrator) GetRegisteredDetectors() []string
GetRegisteredDetectors returns the names of all registered detectors.
func (*Orchestrator) RegisterDetector ¶
func (o *Orchestrator) RegisterDetector(detector Detector)
RegisterDetector adds a detector to the orchestrator.
func (*Orchestrator) RunDetection ¶
func (o *Orchestrator) RunDetection(ctx context.Context) (*Result, error)
RunDetection executes all registered detectors and persists results.
func (*Orchestrator) SetApplier ¶
func (o *Orchestrator) SetApplier(applier RelationshipApplier)
SetApplier sets the relationship applier for virtual edge creation. This allows late binding when the applier depends on other components.
func (*Orchestrator) SetDependencies ¶
func (o *Orchestrator) SetDependencies(deps *DetectorDependencies)
SetDependencies sets the shared dependencies for all detectors. Must be called before RunDetection. Propagates dependencies to all registered detectors.
func (*Orchestrator) UpdateConfig ¶
func (o *Orchestrator) UpdateConfig(config Config) error
UpdateConfig updates the orchestrator configuration.
type OrchestratorConfig ¶
type OrchestratorConfig struct {
Config Config
Storage Storage
Applier RelationshipApplier
Logger *slog.Logger
}
OrchestratorConfig holds configuration for the orchestrator.
type RelationshipApplier ¶
type RelationshipApplier interface {
// ApplyRelationship publishes a new relationship triple to the entity stream.
ApplyRelationship(ctx context.Context, suggestion *RelationshipSuggestion) error
}
RelationshipApplier creates new relationships from approved anomaly suggestions.
type RelationshipInfo ¶
type RelationshipInfo struct {
FromEntityID string `json:"from_entity_id"`
ToEntityID string `json:"to_entity_id"`
Predicate string `json:"predicate"`
}
RelationshipInfo represents a relationship for detector use.
type RelationshipQuerier ¶
type RelationshipQuerier interface {
// GetOutgoingRelationships returns all outgoing relationships from an entity.
GetOutgoingRelationships(ctx context.Context, entityID string) ([]RelationshipInfo, error)
// GetIncomingRelationships returns all incoming relationships to an entity.
GetIncomingRelationships(ctx context.Context, entityID string) ([]RelationshipInfo, error)
}
RelationshipQuerier provides relationship queries for detectors. This interface is satisfied by QueryManager.
type RelationshipSuggestion ¶
type RelationshipSuggestion struct {
// FromEntity is the source entity ID for the suggested relationship.
FromEntity string `json:"from_entity"`
// ToEntity is the target entity ID for the suggested relationship.
ToEntity string `json:"to_entity"`
// Predicate is the suggested relationship type (e.g., "inferred.related_to").
Predicate string `json:"predicate"`
// Confidence is the confidence in this specific suggestion.
Confidence float64 `json:"confidence"`
// Reasoning explains why this relationship is suggested.
Reasoning string `json:"reasoning"`
}
RelationshipSuggestion proposes a relationship to address an anomaly.
type Result ¶
type Result struct {
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
Anomalies []*StructuralAnomaly `json:"anomalies"`
Truncated bool `json:"truncated"` // Hit max anomalies limit
AutoApplied int `json:"auto_applied"` // Virtual edges auto-applied
QueuedForReview int `json:"queued_for_review"` // Anomalies sent to review queue
}
Result summarizes an inference detection run.
func (*Result) AnomalyCount ¶
AnomalyCount returns the total number of anomalies detected.
func (*Result) CountByType ¶
func (r *Result) CountByType() map[AnomalyType]int
CountByType returns anomaly counts grouped by type.
type ReviewConfig ¶
type ReviewConfig struct {
// Enabled activates the review worker
Enabled bool `json:"enabled"`
// Workers is the number of concurrent review workers
Workers int `json:"workers"`
// AutoApproveThreshold: LLM can auto-approve anomalies at or above this confidence
AutoApproveThreshold float64 `json:"auto_approve_threshold"`
// AutoRejectThreshold: LLM can auto-reject anomalies at or below this confidence
AutoRejectThreshold float64 `json:"auto_reject_threshold"`
// FallbackToHuman escalates uncertain cases (between thresholds) to human review
FallbackToHuman bool `json:"fallback_to_human"`
// BatchSize is the number of anomalies to process in each review batch
BatchSize int `json:"batch_size"`
// ReviewTimeout is the timeout for individual LLM review calls
ReviewTimeout time.Duration `json:"review_timeout"`
// LLM configuration for the review model
LLM llm.Config `json:"llm"`
}
ReviewConfig configures the LLM-assisted review pipeline
type ReviewMetrics ¶
type ReviewMetrics struct {
// contains filtered or unexported fields
}
ReviewMetrics provides Prometheus metrics for anomaly review processing.
func NewReviewMetrics ¶
func NewReviewMetrics(component string, registry *metric.MetricsRegistry) *ReviewMetrics
NewReviewMetrics creates a new ReviewMetrics instance using MetricsRegistry.
func (*ReviewMetrics) DecWorkersActive ¶
func (m *ReviewMetrics) DecWorkersActive()
DecWorkersActive decrements the active workers count.
func (*ReviewMetrics) IncWorkersActive ¶
func (m *ReviewMetrics) IncWorkersActive()
IncWorkersActive increments the active workers count.
func (*ReviewMetrics) RecordApproved ¶
func (m *ReviewMetrics) RecordApproved(latencySeconds float64)
RecordApproved records an approved anomaly with latency.
func (*ReviewMetrics) RecordDeferred ¶
func (m *ReviewMetrics) RecordDeferred(latencySeconds float64)
RecordDeferred records an anomaly deferred to human review with latency.
func (*ReviewMetrics) RecordFailed ¶
func (m *ReviewMetrics) RecordFailed(latencySeconds float64)
RecordFailed records a failed processing attempt with latency.
func (*ReviewMetrics) RecordRejected ¶
func (m *ReviewMetrics) RecordRejected(latencySeconds float64)
RecordRejected records a rejected anomaly with latency.
func (*ReviewMetrics) SetPendingCount ¶
func (m *ReviewMetrics) SetPendingCount(count int)
SetPendingCount sets the current pending anomaly count.
type ReviewQueueConfig ¶
type ReviewQueueConfig struct {
// Enabled activates the review queue for lower-confidence gaps
Enabled bool `json:"enabled"`
// MinConfidence is the minimum confidence score for review queue (0.0-1.0)
// Gaps at or above this but below AutoApply.MinConfidence go to review
MinConfidence float64 `json:"min_confidence"`
// MaxConfidence is the maximum confidence score for review queue (0.0-1.0)
// Should match AutoApply.MinConfidence for seamless handoff
MaxConfidence float64 `json:"max_confidence"`
// RequireLLMClassification requires LLM to suggest relationship type
RequireLLMClassification bool `json:"require_llm_classification"`
}
ReviewQueueConfig configures gaps that need LLM or human review before edge creation
func (*ReviewQueueConfig) ShouldQueue ¶
func (c *ReviewQueueConfig) ShouldQueue(confidence float64) bool
ShouldQueue returns true if the anomaly should go to the review queue based on confidence. Anomalies with confidence >= MinConfidence but < MaxConfidence go to review.
type ReviewRequest ¶
type ReviewRequest struct {
Decision string `json:"decision"` // "approved" or "rejected"
Notes string `json:"notes,omitempty"` // Optional notes about the decision
OverridePredicate string `json:"override_predicate,omitempty"` // Override the suggested predicate
TargetEntity string `json:"target_entity,omitempty"` // Target entity for approval (required if suggestion has empty ToEntity)
ReviewedBy string `json:"reviewed_by,omitempty"` // Who reviewed this anomaly
}
ReviewRequest represents a human review decision.
type ReviewWorker ¶
type ReviewWorker struct {
// contains filtered or unexported fields
}
ReviewWorker watches ANOMALY_INDEX and processes pending anomalies. Follows enhancement_worker.go patterns for lifecycle management.
func NewReviewWorker ¶
func NewReviewWorker(cfg *ReviewWorkerConfig) (*ReviewWorker, error)
NewReviewWorker creates a new review worker.
func (*ReviewWorker) IsPaused ¶
func (w *ReviewWorker) IsPaused() bool
IsPaused returns whether the worker is currently paused.
func (*ReviewWorker) Pause ¶
func (w *ReviewWorker) Pause()
Pause stops processing new anomalies while allowing in-flight work to complete.
func (*ReviewWorker) Resume ¶
func (w *ReviewWorker) Resume()
Resume allows processing to continue after a Pause.
func (*ReviewWorker) Start ¶
func (w *ReviewWorker) Start(ctx context.Context) error
Start begins watching ANOMALY_INDEX for pending anomalies.
func (*ReviewWorker) Stop ¶
func (w *ReviewWorker) Stop() error
Stop gracefully shuts down the worker.
type ReviewWorkerConfig ¶
type ReviewWorkerConfig struct {
AnomalyBucket jetstream.KeyValue
Storage Storage
LLMClient llm.Client // optional
Applier RelationshipApplier
Config ReviewConfig
Metrics *ReviewMetrics // optional - nil disables metrics
Logger *slog.Logger
}
ReviewWorkerConfig holds configuration for the review worker.
type SemanticGapConfig ¶
type SemanticGapConfig struct {
// Enabled activates semantic-structural gap detection
Enabled bool `json:"enabled"`
// MinSemanticSimilarity is the minimum embedding similarity to consider (0.0-1.0)
MinSemanticSimilarity float64 `json:"min_semantic_similarity"`
// MinStructuralDistance is the minimum graph distance for flagging (hops)
MinStructuralDistance int `json:"min_structural_distance"`
// MaxGapsPerEntity limits gaps detected per entity to prevent noise
MaxGapsPerEntity int `json:"max_gaps_per_entity"`
// MaxCandidatesPerEntity limits semantic search candidates per entity
MaxCandidatesPerEntity int `json:"max_candidates_per_entity"`
}
SemanticGapConfig configures the semantic-structural gap detector
type SemanticGapDetector ¶
type SemanticGapDetector struct {
// contains filtered or unexported fields
}
SemanticGapDetector detects entities that are semantically similar but structurally distant in the graph.
func NewSemanticGapDetector ¶
func NewSemanticGapDetector(deps *DetectorDependencies) *SemanticGapDetector
NewSemanticGapDetector creates a new semantic gap detector.
func (*SemanticGapDetector) Configure ¶
func (d *SemanticGapDetector) Configure(config interface{}) error
Configure updates the detector configuration.
func (*SemanticGapDetector) Detect ¶
func (d *SemanticGapDetector) Detect(ctx context.Context) ([]*StructuralAnomaly, error)
Detect finds semantic-structural gaps in the graph.
func (*SemanticGapDetector) Name ¶
func (d *SemanticGapDetector) Name() string
Name returns the detector identifier.
func (*SemanticGapDetector) SetDependencies ¶
func (d *SemanticGapDetector) SetDependencies(deps *DetectorDependencies)
SetDependencies updates the detector's dependencies.
type SimilarityFinder ¶
type SimilarityFinder interface {
// FindSimilar returns entity IDs semantically similar to the given entity.
// threshold is the minimum similarity score (0.0-1.0).
// limit is the maximum number of results.
FindSimilar(ctx context.Context, entityID string, threshold float64, limit int) ([]SimilarityResult, error)
}
SimilarityFinder provides semantic similarity search functionality. This interface is satisfied by IndexManager.
type SimilarityResult ¶
type SimilarityResult struct {
EntityID string `json:"entity_id"`
Similarity float64 `json:"similarity"`
}
SimilarityResult represents a similarity search result.
type StatsResponse ¶
type StatsResponse struct {
TotalDetected int `json:"total_detected"`
PendingReview int `json:"pending_review"`
LLMApproved int `json:"llm_approved"`
LLMRejected int `json:"llm_rejected"`
HumanReview int `json:"human_review"`
HumanApproved int `json:"human_approved"`
HumanRejected int `json:"human_rejected"`
Applied int `json:"applied"`
}
StatsResponse contains inference statistics.
type Storage ¶
type Storage interface {
// Save persists an anomaly (creates or updates).
Save(ctx context.Context, anomaly *StructuralAnomaly) error
// SaveWithRevision persists an anomaly with optimistic locking.
// Returns ErrConcurrentModification if the revision has changed.
// Pass revision=0 for new anomalies.
SaveWithRevision(ctx context.Context, anomaly *StructuralAnomaly, revision uint64) error
// GetWithRevision retrieves an anomaly by ID along with its KV revision.
// The revision can be used with SaveWithRevision for optimistic locking.
GetWithRevision(ctx context.Context, id string) (*StructuralAnomaly, uint64, error)
// Get retrieves an anomaly by ID.
Get(ctx context.Context, id string) (*StructuralAnomaly, error)
// GetByStatus retrieves all anomalies with the given status.
GetByStatus(ctx context.Context, status AnomalyStatus) ([]*StructuralAnomaly, error)
// GetByType retrieves all anomalies of the given type.
GetByType(ctx context.Context, anomalyType AnomalyType) ([]*StructuralAnomaly, error)
// UpdateStatus updates an anomaly's status and optional review info.
UpdateStatus(ctx context.Context, id string, status AnomalyStatus, reviewedBy, notes string) error
// Delete removes an anomaly.
Delete(ctx context.Context, id string) error
// Watch returns a channel of anomalies as they're created/updated.
// Used by ReviewWorker to process new pending anomalies.
Watch(ctx context.Context) (<-chan *StructuralAnomaly, error)
// Cleanup removes old resolved anomalies (applied/rejected) older than retention.
// Returns the count of deleted anomalies.
Cleanup(ctx context.Context, retention time.Duration) (int, error)
// Count returns the total number of anomalies by status.
Count(ctx context.Context) (map[AnomalyStatus]int, error)
// IsDismissedPair checks if an entity pair is already tracked (any status including pending).
// This prevents re-detecting the same semantic gap repeatedly across detection runs.
IsDismissedPair(ctx context.Context, entityA, entityB string) (bool, error)
// HasEntityAnomaly checks if an anomaly already exists for an entity+type combination.
// This prevents re-detecting the same core isolation/demotion across detection runs.
HasEntityAnomaly(ctx context.Context, entityID string, anomalyType AnomalyType) (bool, error)
}
Storage defines the interface for persisting structural anomalies.
type StorageConfig ¶
type StorageConfig struct {
// BucketName is the NATS KV bucket for storing anomalies
BucketName string `json:"bucket_name"`
// RetentionDays is how long to keep resolved anomalies (applied/rejected)
RetentionDays int `json:"retention_days"`
// CleanupInterval is how often to run cleanup of old anomalies
CleanupInterval time.Duration `json:"cleanup_interval"`
}
StorageConfig configures anomaly storage
type StructuralAnomaly ¶
type StructuralAnomaly struct {
// ID is a unique identifier for this anomaly (UUID).
ID string `json:"id"`
// Type categorizes the anomaly detection method.
Type AnomalyType `json:"type"`
// EntityA is the primary entity involved in the anomaly.
EntityA string `json:"entity_a"`
// EntityB is the secondary entity (empty for single-entity anomalies like CoreDemotion).
EntityB string `json:"entity_b,omitempty"`
// Confidence is a score from 0.0 to 1.0 indicating detection certainty.
Confidence float64 `json:"confidence"`
// Evidence contains type-specific proof of the anomaly.
Evidence Evidence `json:"evidence"`
// Suggestion is the proposed relationship to address the anomaly.
Suggestion *RelationshipSuggestion `json:"suggestion,omitempty"`
// Status is the current lifecycle state.
Status AnomalyStatus `json:"status"`
// DetectedAt is when the anomaly was first identified.
DetectedAt time.Time `json:"detected_at"`
// ReviewedAt is when the anomaly was reviewed (nil if not yet reviewed).
ReviewedAt *time.Time `json:"reviewed_at,omitempty"`
// ReviewedBy identifies who reviewed (e.g., "llm", "user@example.com").
ReviewedBy string `json:"reviewed_by,omitempty"`
// LLMReasoning is the explanation provided by the LLM for its decision.
LLMReasoning string `json:"llm_reasoning,omitempty"`
// ReviewNotes are additional notes from human review.
ReviewNotes string `json:"review_notes,omitempty"`
// EntityAContext is cached context for human review display.
EntityAContext string `json:"entity_a_context,omitempty"`
// EntityBContext is cached context for human review display.
EntityBContext string `json:"entity_b_context,omitempty"`
}
StructuralAnomaly represents a detected potential issue in the graph structure.
func (*StructuralAnomaly) CanAutoApprove ¶
func (a *StructuralAnomaly) CanAutoApprove(threshold float64) bool
CanAutoApprove returns true if confidence is above the given threshold.
func (*StructuralAnomaly) CanAutoReject ¶
func (a *StructuralAnomaly) CanAutoReject(threshold float64) bool
CanAutoReject returns true if confidence is below the given threshold.
func (*StructuralAnomaly) IsResolved ¶
func (a *StructuralAnomaly) IsResolved() bool
IsResolved returns true if the anomaly has reached a terminal state.
func (*StructuralAnomaly) NeedsHumanReview ¶
func (a *StructuralAnomaly) NeedsHumanReview() bool
NeedsHumanReview returns true if the anomaly requires human attention.
type TransitivityConfig ¶
type TransitivityConfig struct {
// Enabled activates transitivity gap detection
Enabled bool `json:"enabled"`
// MaxIntermediateHops is the maximum hops in A->...->B->...->C chains to analyze
MaxIntermediateHops int `json:"max_intermediate_hops"`
// MinExpectedTransitivity is the maximum expected A-C distance when A->B->C exists
// Gaps where actual distance > this are flagged
MinExpectedTransitivity int `json:"min_expected_transitivity"`
// TransitivePredicates lists predicates that should be transitive
// e.g., ["member_of", "part_of", "located_in"]
TransitivePredicates []string `json:"transitive_predicates"`
}
TransitivityConfig configures the transitivity gap detector
type TransitivityDetector ¶
type TransitivityDetector struct {
// contains filtered or unexported fields
}
TransitivityDetector detects missing transitive relationships. When A->B and B->C exist for transitive predicates, checks if A-C distance is greater than expected, suggesting a missing A->C relationship.
func NewTransitivityDetector ¶
func NewTransitivityDetector(deps *DetectorDependencies) *TransitivityDetector
NewTransitivityDetector creates a new transitivity gap detector.
func (*TransitivityDetector) Configure ¶
func (d *TransitivityDetector) Configure(config interface{}) error
Configure updates the detector configuration.
func (*TransitivityDetector) Detect ¶
func (d *TransitivityDetector) Detect(ctx context.Context) ([]*StructuralAnomaly, error)
Detect finds transitivity gaps in the graph.
func (*TransitivityDetector) Name ¶
func (d *TransitivityDetector) Name() string
Name returns the detector identifier.
func (*TransitivityDetector) SetDependencies ¶
func (d *TransitivityDetector) SetDependencies(deps *DetectorDependencies)
SetDependencies updates the detector's dependencies.
type TripleAdder ¶
TripleAdder defines the interface for adding triples directly to the graph. This interface is satisfied by DataManager.
type VirtualEdgeConfig ¶
type VirtualEdgeConfig struct {
// AutoApply configures automatic edge creation for high-confidence gaps
AutoApply AutoApplyConfig `json:"auto_apply"`
// ReviewQueue configures lower-confidence gaps that need LLM/human review
ReviewQueue ReviewQueueConfig `json:"review_queue"`
}
VirtualEdgeConfig configures automatic edge creation from high-confidence anomalies