inference

package
v1.0.0-alpha.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package inference provides structural anomaly detection for missing relationships.

Package inference provides structural anomaly detection and inference for identifying potential missing relationships in the knowledge graph.

Package inference provides structural anomaly detection for identifying potential missing relationships in the knowledge graph.

Overview

The inference system analyzes the gap between semantic similarity and structural distance to identify entities that should probably be related but aren't explicitly connected. It supports four anomaly detection methods:

  • Semantic-Structural Gap: High embedding similarity with high graph distance
  • Core Isolation: Hub entities with few same-core peer connections
  • Core Demotion: Entities that dropped k-core level between computations
  • Transitivity Gap: Missing transitive relationships (A→B→C exists but A-C distant)

Detected anomalies flow through an approval pipeline:

Detectors → Pending → LLM Review → Human Review (optional) → Applied/Rejected

Architecture

Community Detection Trigger
           ↓
┌──────────────────────────────────────────────┐
│              Anomaly Detectors               │
├──────────┬──────────┬──────────┬─────────────┤
│ Semantic │   Core   │   Core   │Transitivity │
│   Gap    │ Isolation│ Demotion │    Gap      │
└──────────┴──────────┴──────────┴─────────────┘
           ↓
    ANOMALY_INDEX KV
           ↓
    ┌─────────────┐
    │ Review Worker│ ←── LLM API
    └─────────────┘
           ↓
    ┌─────────────┐
    │Human Review │ ←── HTTP Handlers
    │   Queue     │
    └─────────────┘
           ↓
    Edge Applier → Graph

Usage

Configure and run the anomaly detector:

cfg := inference.DefaultConfig()
cfg.Enabled = true
cfg.SemanticGap.Enabled = true
cfg.Review.Enabled = true
cfg.Review.LLM = llm.Config{
    Provider: "openai",
    BaseURL:  "https://api.openai.com/v1",
}

detector, err := inference.NewDetector(cfg, deps)

// Run detection (typically triggered after community detection)
anomalies, err := detector.DetectAnomalies(ctx)

Query and manage anomalies via HTTP handlers:

// List pending anomalies
GET /api/v1/anomalies?status=pending

// Approve an anomaly
POST /api/v1/anomalies/{id}/approve

// Reject an anomaly
POST /api/v1/anomalies/{id}/reject

Anomaly Types

Semantic-Structural Gap (AnomalySemanticStructuralGap):

Detected when entities have high embedding similarity (≥0.7) but high
structural distance (≥3 hops). Evidence includes similarity score and
distance bounds from triangle inequality.

Core Isolation (AnomalyCoreIsolation):

Detected when a high k-core entity has fewer same-core connections than
expected. Evidence includes core level, peer count, and connectivity ratio.

Core Demotion (AnomalyCoreDemotion):

Detected when an entity drops k-core level between computations,
signaling lost relationships. Evidence includes previous/current levels.

Transitivity Gap (AnomalyTransitivityGap):

Detected when A→B and B→C exist but A-C distance exceeds expected
for configured transitive predicates. Evidence includes the chain path.

Configuration

Key configuration options:

Enabled:                   false      # Opt-in feature
RunWithCommunityDetection: true       # Trigger after community detection
MaxAnomaliesPerRun:        100        # Prevent runaway detection

SemanticGap:
  Enabled:               true
  MinSemanticSimilarity: 0.7          # Minimum embedding similarity
  MinStructuralDistance: 3            # Minimum graph distance (hops)
  MaxGapsPerEntity:      5            # Limit per entity

CoreAnomaly:
  Enabled:               true
  MinCoreForHubAnalysis: 3            # Minimum k-core for hub analysis
  HubIsolationThreshold: 0.5          # Peer connectivity ratio threshold
  TrackCoreDemotions:    true
  MinDemotionDelta:      2            # Core level drop to flag

Transitivity:
  Enabled:                 true
  TransitivePredicates:    ["member_of", "part_of", "located_in"]

Review:
  Enabled:              false         # Requires LLM setup
  AutoApproveThreshold: 0.9           # Auto-approve confidence
  AutoRejectThreshold:  0.3           # Auto-reject confidence
  FallbackToHuman:      true          # Escalate uncertain cases

VirtualEdges:
  AutoApply:
    Enabled:           false          # Auto-create edges
    MinConfidence:     0.95           # Confidence threshold for auto-apply
    PredicateTemplate: "inferred.semantic.{band}"
  ReviewQueue:
    Enabled:           false          # Queue uncertain gaps for review
    MinConfidence:     0.7            # Lower bound for review queue
    MaxConfidence:     0.95           # Upper bound (below auto-apply)

Anomaly Lifecycle

Each anomaly progresses through states defined by AnomalyStatus:

StatusPending → StatusLLMReviewing → StatusLLMApproved/StatusLLMRejected
                                            ↓
                                   StatusHumanReview (if uncertain)
                                            ↓
                                   StatusApproved/StatusRejected
                                            ↓
                                   StatusApplied (edge created)

High-confidence anomalies may skip review with StatusAutoApplied.

Thread Safety

The Detector and Storage types are safe for concurrent use. HTTP handlers use optimistic locking via revision numbers when updating anomaly status.

Metrics

The inference package exports Prometheus metrics:

  • anomalies_detected_total: Anomalies detected by type
  • anomalies_reviewed_total: Anomalies reviewed by decision
  • anomalies_applied_total: Anomalies applied to graph
  • detection_duration_seconds: Detection run duration
  • review_duration_seconds: LLM review duration

See Also

Related packages:

Package inference provides structural anomaly detection and inference for identifying potential missing relationships in the knowledge graph.

Package inference provides structural anomaly detection for missing relationships.

Package inference provides structural anomaly detection for missing relationships.

Package inference provides structural anomaly detection for missing relationships.

Package inference provides structural anomaly detection for missing relationships. It analyzes the gap between semantic similarity and structural distance to identify potential missing edges in the knowledge graph.

Index

Constants

View Source
const (
	// DefaultAnomalyBucket is the default NATS KV bucket for storing anomalies.
	DefaultAnomalyBucket = "ANOMALY_INDEX"
)

Variables

View Source
var ErrConcurrentModification = stderrors.New("concurrent modification detected")

ErrConcurrentModification is returned when an optimistic lock check fails.

Functions

This section is empty.

Types

type AnomalyStatus

type AnomalyStatus string

AnomalyStatus represents the lifecycle state of an anomaly.

const (
	// StatusPending indicates the anomaly is awaiting review.
	StatusPending AnomalyStatus = "pending"

	// StatusLLMReviewing indicates the anomaly is currently being processed by LLM.
	StatusLLMReviewing AnomalyStatus = "llm_reviewing"

	// StatusLLMApproved indicates the LLM approved this anomaly (high confidence).
	StatusLLMApproved AnomalyStatus = "llm_approved"

	// StatusLLMRejected indicates the LLM rejected this anomaly (low confidence).
	StatusLLMRejected AnomalyStatus = "llm_rejected"

	// StatusHumanReview indicates the anomaly has been escalated for human decision.
	StatusHumanReview AnomalyStatus = "human_review"

	// StatusApproved indicates a human approved this anomaly.
	StatusApproved AnomalyStatus = "approved"

	// StatusRejected indicates a human rejected this anomaly.
	StatusRejected AnomalyStatus = "rejected"

	// StatusApplied indicates the suggested relationship was created in the graph.
	StatusApplied AnomalyStatus = "applied"

	// StatusDismissed indicates the anomaly was dismissed and should not be re-detected.
	// This prevents the same entity pair from being flagged repeatedly.
	StatusDismissed AnomalyStatus = "dismissed"

	// StatusAutoApplied indicates the relationship was automatically applied
	// because it met the auto-apply threshold (high confidence).
	StatusAutoApplied AnomalyStatus = "auto_applied"
)

type AnomalyType

type AnomalyType string

AnomalyType identifies the category of structural anomaly detected.

const (
	// AnomalySemanticStructuralGap indicates high semantic similarity with high structural distance.
	// This suggests entities should probably be related but aren't explicitly connected.
	AnomalySemanticStructuralGap AnomalyType = "semantic_structural_gap"

	// AnomalyCoreIsolation indicates a high k-core entity with few same-core peer connections.
	// This suggests missing hub relationships within the core structure.
	AnomalyCoreIsolation AnomalyType = "core_isolation"

	// AnomalyCoreDemotion indicates an entity dropped from its k-core level between computations.
	// This signals lost relationships or potential data quality issues.
	AnomalyCoreDemotion AnomalyType = "core_demotion"

	// AnomalyTransitivityGap indicates A->B and B->C exist but A-C distance exceeds expected.
	// This suggests a missing transitive relationship for configured predicates.
	AnomalyTransitivityGap AnomalyType = "transitivity_gap"
)

type AutoApplyConfig

type AutoApplyConfig struct {
	// Enabled activates automatic edge creation
	Enabled bool `json:"enabled"`

	// MinConfidence is the minimum confidence score to auto-apply (0.0-1.0)
	// Confidence is a composite score: similarity + distance boost + core boost
	// Gaps at or above this threshold are automatically converted to edges
	MinConfidence float64 `json:"min_confidence"`

	// PredicateTemplate is the predicate format for created edges
	// Supports {band} placeholder: "inferred.semantic.{band}"
	// Band values: "high" (>=0.95), "medium" (>=0.85), "related" (else)
	PredicateTemplate string `json:"predicate_template"`
}

AutoApplyConfig configures automatic edge creation for high-confidence semantic gaps

func (*AutoApplyConfig) BuildPredicate

func (c *AutoApplyConfig) BuildPredicate(confidence float64) string

BuildPredicate generates the predicate string from the template based on confidence. Template supports {band} placeholder which is replaced with:

  • "high" for confidence >= 0.95
  • "medium" for confidence >= 0.85
  • "related" for lower confidence scores

func (*AutoApplyConfig) ShouldAutoApply

func (c *AutoApplyConfig) ShouldAutoApply(confidence float64) bool

ShouldAutoApply returns true if the anomaly meets auto-apply criteria based on confidence. Confidence is a composite score (similarity + distance boost + core boost) capped at 1.0.

type CommunityInfo

type CommunityInfo struct {
	// ID is the community identifier
	ID string
	// Members is the list of entity IDs in this community
	Members []string
	// Level is the hierarchy level (0 = base communities)
	Level int
}

CommunityInfo provides community membership information for detectors. This is a simple interface to avoid circular imports with the clustering package.

type Config

type Config struct {
	// Enabled activates inference detection
	Enabled bool `json:"enabled"`

	// RunWithCommunityDetection triggers inference after community detection completes
	RunWithCommunityDetection bool `json:"run_with_community_detection"`

	// MaxAnomaliesPerRun limits total anomalies detected per run to prevent runaway detection
	MaxAnomaliesPerRun int `json:"max_anomalies_per_run"`

	// Detector-specific configurations
	SemanticGap  SemanticGapConfig  `json:"semantic_gap"`
	CoreAnomaly  CoreAnomalyConfig  `json:"core_anomaly"`
	Transitivity TransitivityConfig `json:"transitivity"`

	// Review configuration for LLM-assisted and human review
	Review ReviewConfig `json:"review"`

	// VirtualEdges configuration for auto-applying high-confidence gaps as edges
	VirtualEdges VirtualEdgeConfig `json:"virtual_edges"`

	// Storage configuration
	Storage StorageConfig `json:"storage"`

	// Operation timeouts
	DetectionTimeout time.Duration `json:"detection_timeout"`
}

Config configures the structural inference system

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults fills in any zero values with defaults

func (*Config) GetEnabledDetectors

func (c *Config) GetEnabledDetectors() []string

GetEnabledDetectors returns a list of enabled detector names

func (*Config) IsDetectorEnabled

func (c *Config) IsDetectorEnabled(detector string) bool

IsDetectorEnabled checks if a specific detector is enabled

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid

type CoreAnomalyConfig

type CoreAnomalyConfig struct {
	// Enabled activates core-based anomaly detection (isolation + demotion)
	Enabled bool `json:"enabled"`

	// MinCoreForHubAnalysis sets minimum k-core level for hub isolation analysis
	MinCoreForHubAnalysis int `json:"min_core_for_hub_analysis"`

	// HubIsolationThreshold is the peer connectivity ratio below which hub is "isolated"
	// Value between 0.0-1.0: ratio of actual same-core connections to expected
	HubIsolationThreshold float64 `json:"hub_isolation_threshold"`

	// TrackCoreDemotions enables detection of entities that dropped k-core level
	TrackCoreDemotions bool `json:"track_core_demotions"`

	// MinDemotionDelta is the minimum core level drop to flag (e.g., 2 means core 5->3 flagged)
	MinDemotionDelta int `json:"min_demotion_delta"`
}

CoreAnomalyConfig configures the k-core based anomaly detectors

type CoreAnomalyDetector

type CoreAnomalyDetector struct {
	// contains filtered or unexported fields
}

CoreAnomalyDetector detects k-core based structural anomalies: - Core isolation: high k-core entities with few same-core peers - Core demotion: entities that dropped k-core level between computations

func NewCoreAnomalyDetector

func NewCoreAnomalyDetector(deps *DetectorDependencies) *CoreAnomalyDetector

NewCoreAnomalyDetector creates a new core anomaly detector.

func (*CoreAnomalyDetector) Configure

func (d *CoreAnomalyDetector) Configure(config interface{}) error

Configure updates the detector configuration.

func (*CoreAnomalyDetector) Detect

Detect finds core isolation and demotion anomalies.

func (*CoreAnomalyDetector) Name

func (d *CoreAnomalyDetector) Name() string

Name returns the detector identifier.

func (*CoreAnomalyDetector) SetDependencies

func (d *CoreAnomalyDetector) SetDependencies(deps *DetectorDependencies)

SetDependencies updates the detector's dependencies.

type Decision

type Decision int

Decision represents the outcome of anomaly review.

const (
	// DecisionApprove indicates the anomaly should be applied to the graph.
	DecisionApprove Decision = iota
	// DecisionReject indicates the anomaly should be dismissed.
	DecisionReject
	// DecisionHumanReview indicates the anomaly needs human attention.
	DecisionHumanReview
)

func (Decision) String

func (d Decision) String() string

String returns a human-readable representation of the decision.

type Detector

type Detector interface {
	// Name returns the detector identifier for logging and metrics.
	Name() string

	// Detect runs the detection algorithm and returns discovered anomalies.
	// The context should be used for cancellation and timeouts.
	Detect(ctx context.Context) ([]*StructuralAnomaly, error)

	// Configure updates the detector configuration.
	// Called during orchestrator initialization and on config updates.
	Configure(config interface{}) error

	// SetDependencies updates the detector's shared dependencies.
	// Called by the orchestrator before running detection.
	SetDependencies(deps *DetectorDependencies)
}

Detector defines the interface for anomaly detection algorithms.

type DetectorDependencies

type DetectorDependencies struct {
	// StructuralIndices provides access to k-core and pivot indices.
	StructuralIndices *structural.Indices

	// PreviousKCore is the k-core index from the previous computation (for demotion detection).
	PreviousKCore *structural.KCoreIndex

	// Communities provides community membership for scoped detection.
	// Core isolation is analyzed within each community rather than globally.
	Communities []CommunityInfo

	// SimilarityFinder provides semantic similarity queries.
	SimilarityFinder SimilarityFinder

	// RelationshipQuerier provides relationship queries for transitivity detection.
	RelationshipQuerier RelationshipQuerier

	// AnomalyStorage provides access to persisted anomalies (for dismissed pair checks).
	AnomalyStorage Storage

	// Logger for detector logging.
	Logger *slog.Logger
}

DetectorDependencies provides access to shared resources needed by detectors.

type DirectRelationshipApplier

type DirectRelationshipApplier struct {
	// contains filtered or unexported fields
}

DirectRelationshipApplier adds triples directly via DataManager. Used for intra-processor mutations, matching the community inference pattern. This avoids NATS roundtrips for operations within the same processor.

func NewDirectRelationshipApplier

func NewDirectRelationshipApplier(adder TripleAdder, logger *slog.Logger) *DirectRelationshipApplier

NewDirectRelationshipApplier creates an applier that uses DataManager directly.

func (*DirectRelationshipApplier) ApplyRelationship

func (a *DirectRelationshipApplier) ApplyRelationship(
	ctx context.Context,
	suggestion *RelationshipSuggestion,
) error

ApplyRelationship adds a triple directly via DataManager.

type EntityManager

type EntityManager interface {
	ExistsEntity(ctx context.Context, id string) (bool, error)
	CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
	// ListWithPrefix returns entity IDs matching a prefix (for sibling discovery)
	ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}

EntityManager provides entity existence checks and creation. Typically implemented by datamanager.Manager.

type Evidence

type Evidence struct {
	// Semantic-Structural Gap evidence
	Similarity         float64 `json:"similarity,omitempty"`           // Embedding similarity score
	StructuralDistance int     `json:"structural_distance,omitempty"`  // Actual or estimated hop count
	DistanceLowerBound int     `json:"distance_lower_bound,omitempty"` // Triangle inequality lower bound
	DistanceUpperBound int     `json:"distance_upper_bound,omitempty"` // Triangle inequality upper bound

	// Core Isolation evidence
	CoreLevel         int     `json:"core_level,omitempty"`          // Entity's k-core number
	PeerCount         int     `json:"peer_count,omitempty"`          // Number of same-core neighbors
	ExpectedPeerCount int     `json:"expected_peer_count,omitempty"` // Expected based on core level
	PeerConnectivity  float64 `json:"peer_connectivity,omitempty"`   // Ratio of actual/expected peers
	CommunityID       string  `json:"community_id,omitempty"`        // Community where isolation detected

	// Core Demotion evidence
	PreviousCoreLevel int `json:"previous_core_level,omitempty"` // Core level before demotion
	CurrentCoreLevel  int `json:"current_core_level,omitempty"`  // Core level after demotion
	LostConnections   int `json:"lost_connections,omitempty"`    // Number of connections lost

	// Transitivity Gap evidence
	Predicate       string   `json:"predicate,omitempty"`         // The transitive predicate
	ChainPath       []string `json:"chain_path,omitempty"`        // The A->B->C path entities
	ActualDistance  int      `json:"actual_distance,omitempty"`   // Measured A-C distance
	ExpectedMaxHops int      `json:"expected_max_hops,omitempty"` // Config max for transitivity
}

Evidence contains type-specific proof of an anomaly. Different anomaly types populate different fields.

type HTTPHandler

type HTTPHandler struct {
	// contains filtered or unexported fields
}

HTTPHandler provides HTTP endpoints for human review of structural anomalies.

func NewHTTPHandler

func NewHTTPHandler(storage Storage, applier RelationshipApplier, logger *slog.Logger) *HTTPHandler

NewHTTPHandler creates a new HTTP handler for inference endpoints.

func (*HTTPHandler) RegisterHTTPHandlers

func (h *HTTPHandler) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers inference endpoints with the given mux.

type HierarchyConfig

type HierarchyConfig struct {
	// Enabled activates hierarchy inference on entity creation
	Enabled bool `json:"enabled"`

	// CreateTypeEdges enables type membership edges (5-part prefix → type container)
	// StandardIRI: skos:broader
	CreateTypeEdges bool `json:"create_type_edges"`

	// CreateSystemEdges enables system membership edges (4-part prefix → system container)
	// StandardIRI: skos:broader
	CreateSystemEdges bool `json:"create_system_edges"`

	// CreateDomainEdges enables domain membership edges (3-part prefix → domain container)
	// StandardIRI: skos:broader
	CreateDomainEdges bool `json:"create_domain_edges"`

	// CreateTypeSiblings enables sibling edges between entities with the same type (5-part prefix)
	// When enabled, creates bidirectional hierarchy.type.sibling edges
	// Cost: O(N) per new entity where N is existing sibling count
	CreateTypeSiblings bool `json:"create_type_siblings"`
}

HierarchyConfig configures hierarchy container inference.

func DefaultHierarchyConfig

func DefaultHierarchyConfig() HierarchyConfig

DefaultHierarchyConfig returns sensible defaults for hierarchy inference.

type HierarchyInference

type HierarchyInference struct {
	// contains filtered or unexported fields
}

HierarchyInference creates membership edges to container entities based on the 6-part entity ID structure. It operates synchronously - hierarchy triples are computed and returned before the entity is written to storage.

Entity ID format: org.platform.domain.system.type.instance Example: acme.iot.sensors.hvac.temperature.001

Container entities are auto-created with the following pattern:

  • Type container: org.platform.domain.system.type.group (6-part)
  • System container: org.platform.domain.system.group.container (6-part)
  • Domain container: org.platform.domain.group.container.level (6-part)

Graph distances via containers:

  • Same type siblings: 2 hops (entity → type.group ← entity)
  • Same system, different type: 4 hops
  • Same domain, different system: 6 hops

This is a stateless utility - no lifecycle methods (Start/Stop).

func NewHierarchyInference

func NewHierarchyInference(
	entityManager EntityManager,
	tripleAdder TripleAdder,
	config HierarchyConfig,
	logger *slog.Logger,
) *HierarchyInference

NewHierarchyInference creates a new hierarchy inference component.

Parameters:

  • entityManager: Component for entity existence checks and creation
  • tripleAdder: Component that can add triples (for inverse edges on containers)
  • config: Configuration for edge creation
  • logger: Logger for observability (can be nil)

func (*HierarchyInference) ClearCache

func (h *HierarchyInference) ClearCache()

ClearCache resets the container cache. Call when containers might be deleted.

func (*HierarchyInference) GetCacheStats

func (h *HierarchyInference) GetCacheStats() int

GetCacheStats returns statistics about the container cache.

func (*HierarchyInference) GetHierarchyTriples

func (h *HierarchyInference) GetHierarchyTriples(ctx context.Context, entityID string) ([]message.Triple, error)

GetHierarchyTriples returns hierarchy membership triples for the given entity ID. This method has NO side effects - it only computes triples, it doesn't write them. The caller must include these triples in the entity before writing to storage.

For each enabled level (type, system, domain), it: 1. Computes the container entity ID 2. Auto-creates the container if it doesn't exist (side effect on containers only) 3. Returns a membership triple from entity to container 4. Adds inverse edge to container (container → contains → entity)

Returns empty slice if hierarchy is disabled or entity ID is invalid.

func (*HierarchyInference) GetMetrics

func (h *HierarchyInference) GetMetrics() (containersCreated, edgesCreated, edgesFailed int64)

GetMetrics returns metrics for hierarchy inference operations.

func (*HierarchyInference) OnEntityCreated

func (h *HierarchyInference) OnEntityCreated(ctx context.Context, entityID string) error

OnEntityCreated is the legacy method kept for backwards compatibility. It calls GetHierarchyTriples and adds them using tripleAdder. New code should use GetHierarchyTriples directly to avoid cascading writes.

type MutationRelationshipApplier

type MutationRelationshipApplier struct {
	// contains filtered or unexported fields
}

MutationRelationshipApplier sends triple add requests to graph-ingest via NATS request/reply. This is the preferred applier for cross-processor mutations as it goes through the proper ingestion pipeline (graph.mutation.triple.add -> graph-ingest -> indexing).

func NewMutationRelationshipApplier

func NewMutationRelationshipApplier(natsClient *natsclient.Client, logger *slog.Logger) *MutationRelationshipApplier

NewMutationRelationshipApplier creates an applier that uses the graph-ingest mutation API.

func (*MutationRelationshipApplier) ApplyRelationship

func (a *MutationRelationshipApplier) ApplyRelationship(
	ctx context.Context,
	suggestion *RelationshipSuggestion,
) error

ApplyRelationship sends an add triple request to graph-ingest.

type NATSAnomalyStorage

type NATSAnomalyStorage struct {
	// contains filtered or unexported fields
}

NATSAnomalyStorage implements Storage using NATS KV.

func NewNATSAnomalyStorage

func NewNATSAnomalyStorage(kv jetstream.KeyValue, logger *slog.Logger) *NATSAnomalyStorage

NewNATSAnomalyStorage creates a new NATS-backed anomaly storage.

func (*NATSAnomalyStorage) Cleanup

func (s *NATSAnomalyStorage) Cleanup(ctx context.Context, retention time.Duration) (int, error)

Cleanup removes old resolved anomalies older than retention.

func (*NATSAnomalyStorage) Count

Count returns the total number of anomalies by status.

func (*NATSAnomalyStorage) Delete

func (s *NATSAnomalyStorage) Delete(ctx context.Context, id string) error

Delete removes an anomaly.

func (*NATSAnomalyStorage) Get

Get retrieves an anomaly by ID.

func (*NATSAnomalyStorage) GetByStatus

func (s *NATSAnomalyStorage) GetByStatus(ctx context.Context, status AnomalyStatus) ([]*StructuralAnomaly, error)

GetByStatus retrieves all anomalies with the given status.

func (*NATSAnomalyStorage) GetByType

func (s *NATSAnomalyStorage) GetByType(ctx context.Context, anomalyType AnomalyType) ([]*StructuralAnomaly, error)

GetByType retrieves all anomalies of the given type.

func (*NATSAnomalyStorage) GetWithRevision

func (s *NATSAnomalyStorage) GetWithRevision(ctx context.Context, id string) (*StructuralAnomaly, uint64, error)

GetWithRevision retrieves an anomaly by ID along with its KV revision.

func (*NATSAnomalyStorage) HasEntityAnomaly

func (s *NATSAnomalyStorage) HasEntityAnomaly(ctx context.Context, entityID string, anomalyType AnomalyType) (bool, error)

HasEntityAnomaly checks if an anomaly already exists for an entity+type combination. This prevents re-detecting the same core isolation/demotion across detection runs.

func (*NATSAnomalyStorage) IsDismissedPair

func (s *NATSAnomalyStorage) IsDismissedPair(ctx context.Context, entityA, entityB string) (bool, error)

IsDismissedPair checks if an entity pair is already tracked (any status including pending). This prevents re-detecting the same semantic gap repeatedly across detection runs.

func (*NATSAnomalyStorage) MarkPairDismissed

func (s *NATSAnomalyStorage) MarkPairDismissed(ctx context.Context, entityA, entityB string) error

MarkPairDismissed creates an index entry to prevent future re-detection. Called when an anomaly is dismissed, rejected, or auto-applied.

func (*NATSAnomalyStorage) Save

func (s *NATSAnomalyStorage) Save(ctx context.Context, anomaly *StructuralAnomaly) error

Save persists an anomaly to NATS KV.

func (*NATSAnomalyStorage) SaveWithRevision

func (s *NATSAnomalyStorage) SaveWithRevision(ctx context.Context, anomaly *StructuralAnomaly, revision uint64) error

SaveWithRevision persists an anomaly with optimistic locking. If revision > 0, the save will fail if the KV entry has been modified.

func (*NATSAnomalyStorage) UpdateStatus

func (s *NATSAnomalyStorage) UpdateStatus(
	ctx context.Context,
	id string,
	status AnomalyStatus,
	reviewedBy, notes string,
) error

UpdateStatus updates an anomaly's status and optional review info.

func (*NATSAnomalyStorage) Watch

func (s *NATSAnomalyStorage) Watch(ctx context.Context) (<-chan *StructuralAnomaly, error)

Watch returns a channel of anomalies as they're created/updated.

type NATSRelationshipApplier

type NATSRelationshipApplier struct {
	// contains filtered or unexported fields
}

NATSRelationshipApplier publishes relationship triples to the entity stream. The normal ingestion pipeline then indexes the new relationships.

func NewNATSRelationshipApplier

func NewNATSRelationshipApplier(
	js jetstream.JetStream,
	subject string,
	logger *slog.Logger,
) *NATSRelationshipApplier

NewNATSRelationshipApplier creates a new applier that publishes to NATS.

func (*NATSRelationshipApplier) ApplyRelationship

func (a *NATSRelationshipApplier) ApplyRelationship(
	ctx context.Context,
	suggestion *RelationshipSuggestion,
) error

ApplyRelationship publishes a new relationship triple to the entity stream.

type NoOpApplier

type NoOpApplier struct {
	// contains filtered or unexported fields
}

NoOpApplier is a no-op implementation for testing or disabled mode.

func NewNoOpApplier

func NewNoOpApplier(logger *slog.Logger) *NoOpApplier

NewNoOpApplier creates an applier that logs but doesn't persist.

func (*NoOpApplier) ApplyRelationship

func (a *NoOpApplier) ApplyRelationship(
	_ context.Context,
	suggestion *RelationshipSuggestion,
) error

ApplyRelationship logs the suggestion but doesn't persist it.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator coordinates running all enabled detectors.

func NewOrchestrator

func NewOrchestrator(cfg OrchestratorConfig) (*Orchestrator, error)

NewOrchestrator creates a new detector orchestrator.

func (*Orchestrator) GetRegisteredDetectors

func (o *Orchestrator) GetRegisteredDetectors() []string

GetRegisteredDetectors returns the names of all registered detectors.

func (*Orchestrator) RegisterDetector

func (o *Orchestrator) RegisterDetector(detector Detector)

RegisterDetector adds a detector to the orchestrator.

func (*Orchestrator) RunDetection

func (o *Orchestrator) RunDetection(ctx context.Context) (*Result, error)

RunDetection executes all registered detectors and persists results.

func (*Orchestrator) SetApplier

func (o *Orchestrator) SetApplier(applier RelationshipApplier)

SetApplier sets the relationship applier for virtual edge creation. This allows late binding when the applier depends on other components.

func (*Orchestrator) SetDependencies

func (o *Orchestrator) SetDependencies(deps *DetectorDependencies)

SetDependencies sets the shared dependencies for all detectors. Must be called before RunDetection. Propagates dependencies to all registered detectors.

func (*Orchestrator) UpdateConfig

func (o *Orchestrator) UpdateConfig(config Config) error

UpdateConfig updates the orchestrator configuration.

type OrchestratorConfig

type OrchestratorConfig struct {
	Config  Config
	Storage Storage
	Applier RelationshipApplier
	Logger  *slog.Logger
}

OrchestratorConfig holds configuration for the orchestrator.

type RelationshipApplier

type RelationshipApplier interface {
	// ApplyRelationship publishes a new relationship triple to the entity stream.
	ApplyRelationship(ctx context.Context, suggestion *RelationshipSuggestion) error
}

RelationshipApplier creates new relationships from approved anomaly suggestions.

type RelationshipInfo

type RelationshipInfo struct {
	FromEntityID string `json:"from_entity_id"`
	ToEntityID   string `json:"to_entity_id"`
	Predicate    string `json:"predicate"`
}

RelationshipInfo represents a relationship for detector use.

type RelationshipQuerier

type RelationshipQuerier interface {
	// GetOutgoingRelationships returns all outgoing relationships from an entity.
	GetOutgoingRelationships(ctx context.Context, entityID string) ([]RelationshipInfo, error)

	// GetIncomingRelationships returns all incoming relationships to an entity.
	GetIncomingRelationships(ctx context.Context, entityID string) ([]RelationshipInfo, error)
}

RelationshipQuerier provides relationship queries for detectors. This interface is satisfied by QueryManager.

type RelationshipSuggestion

type RelationshipSuggestion struct {
	// FromEntity is the source entity ID for the suggested relationship.
	FromEntity string `json:"from_entity"`

	// ToEntity is the target entity ID for the suggested relationship.
	ToEntity string `json:"to_entity"`

	// Predicate is the suggested relationship type (e.g., "inferred.related_to").
	Predicate string `json:"predicate"`

	// Confidence is the confidence in this specific suggestion.
	Confidence float64 `json:"confidence"`

	// Reasoning explains why this relationship is suggested.
	Reasoning string `json:"reasoning"`
}

RelationshipSuggestion proposes a relationship to address an anomaly.

type Result

type Result struct {
	StartedAt       time.Time            `json:"started_at"`
	CompletedAt     time.Time            `json:"completed_at"`
	Anomalies       []*StructuralAnomaly `json:"anomalies"`
	Truncated       bool                 `json:"truncated"`         // Hit max anomalies limit
	AutoApplied     int                  `json:"auto_applied"`      // Virtual edges auto-applied
	QueuedForReview int                  `json:"queued_for_review"` // Anomalies sent to review queue
}

Result summarizes an inference detection run.

func (*Result) AnomalyCount

func (r *Result) AnomalyCount() int

AnomalyCount returns the total number of anomalies detected.

func (*Result) CountByType

func (r *Result) CountByType() map[AnomalyType]int

CountByType returns anomaly counts grouped by type.

func (*Result) Duration

func (r *Result) Duration() time.Duration

Duration returns how long the detection run took.

type ReviewConfig

type ReviewConfig struct {
	// Enabled activates the review worker
	Enabled bool `json:"enabled"`

	// Workers is the number of concurrent review workers
	Workers int `json:"workers"`

	// AutoApproveThreshold: LLM can auto-approve anomalies at or above this confidence
	AutoApproveThreshold float64 `json:"auto_approve_threshold"`

	// AutoRejectThreshold: LLM can auto-reject anomalies at or below this confidence
	AutoRejectThreshold float64 `json:"auto_reject_threshold"`

	// FallbackToHuman escalates uncertain cases (between thresholds) to human review
	FallbackToHuman bool `json:"fallback_to_human"`

	// BatchSize is the number of anomalies to process in each review batch
	BatchSize int `json:"batch_size"`

	// ReviewTimeout is the timeout for individual LLM review calls
	ReviewTimeout time.Duration `json:"review_timeout"`

	// LLM configuration for the review model
	LLM llm.Config `json:"llm"`
}

ReviewConfig configures the LLM-assisted review pipeline

type ReviewMetrics

type ReviewMetrics struct {
	// contains filtered or unexported fields
}

ReviewMetrics provides Prometheus metrics for anomaly review processing.

func NewReviewMetrics

func NewReviewMetrics(component string, registry *metric.MetricsRegistry) *ReviewMetrics

NewReviewMetrics creates a new ReviewMetrics instance using MetricsRegistry.

func (*ReviewMetrics) DecWorkersActive

func (m *ReviewMetrics) DecWorkersActive()

DecWorkersActive decrements the active workers count.

func (*ReviewMetrics) IncWorkersActive

func (m *ReviewMetrics) IncWorkersActive()

IncWorkersActive increments the active workers count.

func (*ReviewMetrics) RecordApproved

func (m *ReviewMetrics) RecordApproved(latencySeconds float64)

RecordApproved records an approved anomaly with latency.

func (*ReviewMetrics) RecordDeferred

func (m *ReviewMetrics) RecordDeferred(latencySeconds float64)

RecordDeferred records an anomaly deferred to human review with latency.

func (*ReviewMetrics) RecordFailed

func (m *ReviewMetrics) RecordFailed(latencySeconds float64)

RecordFailed records a failed processing attempt with latency.

func (*ReviewMetrics) RecordRejected

func (m *ReviewMetrics) RecordRejected(latencySeconds float64)

RecordRejected records a rejected anomaly with latency.

func (*ReviewMetrics) SetPendingCount

func (m *ReviewMetrics) SetPendingCount(count int)

SetPendingCount sets the current pending anomaly count.

type ReviewQueueConfig

type ReviewQueueConfig struct {
	// Enabled activates the review queue for lower-confidence gaps
	Enabled bool `json:"enabled"`

	// MinConfidence is the minimum confidence score for review queue (0.0-1.0)
	// Gaps at or above this but below AutoApply.MinConfidence go to review
	MinConfidence float64 `json:"min_confidence"`

	// MaxConfidence is the maximum confidence score for review queue (0.0-1.0)
	// Should match AutoApply.MinConfidence for seamless handoff
	MaxConfidence float64 `json:"max_confidence"`

	// RequireLLMClassification requires LLM to suggest relationship type
	RequireLLMClassification bool `json:"require_llm_classification"`
}

ReviewQueueConfig configures gaps that need LLM or human review before edge creation

func (*ReviewQueueConfig) ShouldQueue

func (c *ReviewQueueConfig) ShouldQueue(confidence float64) bool

ShouldQueue returns true if the anomaly should go to the review queue based on confidence. Anomalies with confidence >= MinConfidence but < MaxConfidence go to review.

type ReviewRequest

type ReviewRequest struct {
	Decision          string `json:"decision"`                     // "approved" or "rejected"
	Notes             string `json:"notes,omitempty"`              // Optional notes about the decision
	OverridePredicate string `json:"override_predicate,omitempty"` // Override the suggested predicate
	TargetEntity      string `json:"target_entity,omitempty"`      // Target entity for approval (required if suggestion has empty ToEntity)
	ReviewedBy        string `json:"reviewed_by,omitempty"`        // Who reviewed this anomaly
}

ReviewRequest represents a human review decision.

type ReviewWorker

type ReviewWorker struct {
	// contains filtered or unexported fields
}

ReviewWorker watches ANOMALY_INDEX and processes pending anomalies. Follows enhancement_worker.go patterns for lifecycle management.

func NewReviewWorker

func NewReviewWorker(cfg *ReviewWorkerConfig) (*ReviewWorker, error)

NewReviewWorker creates a new review worker.

func (*ReviewWorker) IsPaused

func (w *ReviewWorker) IsPaused() bool

IsPaused returns whether the worker is currently paused.

func (*ReviewWorker) Pause

func (w *ReviewWorker) Pause()

Pause stops processing new anomalies while allowing in-flight work to complete.

func (*ReviewWorker) Resume

func (w *ReviewWorker) Resume()

Resume allows processing to continue after a Pause.

func (*ReviewWorker) Start

func (w *ReviewWorker) Start(ctx context.Context) error

Start begins watching ANOMALY_INDEX for pending anomalies.

func (*ReviewWorker) Stop

func (w *ReviewWorker) Stop() error

Stop gracefully shuts down the worker.

type ReviewWorkerConfig

type ReviewWorkerConfig struct {
	AnomalyBucket jetstream.KeyValue
	Storage       Storage
	LLMClient     llm.Client // optional
	Applier       RelationshipApplier
	Config        ReviewConfig
	Metrics       *ReviewMetrics // optional - nil disables metrics
	Logger        *slog.Logger
}

ReviewWorkerConfig holds configuration for the review worker.

type SemanticGapConfig

type SemanticGapConfig struct {
	// Enabled activates semantic-structural gap detection
	Enabled bool `json:"enabled"`

	// MinSemanticSimilarity is the minimum embedding similarity to consider (0.0-1.0)
	MinSemanticSimilarity float64 `json:"min_semantic_similarity"`

	// MinStructuralDistance is the minimum graph distance for flagging (hops)
	MinStructuralDistance int `json:"min_structural_distance"`

	// MaxGapsPerEntity limits gaps detected per entity to prevent noise
	MaxGapsPerEntity int `json:"max_gaps_per_entity"`

	// MaxCandidatesPerEntity limits semantic search candidates per entity
	MaxCandidatesPerEntity int `json:"max_candidates_per_entity"`
}

SemanticGapConfig configures the semantic-structural gap detector

type SemanticGapDetector

type SemanticGapDetector struct {
	// contains filtered or unexported fields
}

SemanticGapDetector detects entities that are semantically similar but structurally distant in the graph.

func NewSemanticGapDetector

func NewSemanticGapDetector(deps *DetectorDependencies) *SemanticGapDetector

NewSemanticGapDetector creates a new semantic gap detector.

func (*SemanticGapDetector) Configure

func (d *SemanticGapDetector) Configure(config interface{}) error

Configure updates the detector configuration.

func (*SemanticGapDetector) Detect

Detect finds semantic-structural gaps in the graph.

func (*SemanticGapDetector) Name

func (d *SemanticGapDetector) Name() string

Name returns the detector identifier.

func (*SemanticGapDetector) SetDependencies

func (d *SemanticGapDetector) SetDependencies(deps *DetectorDependencies)

SetDependencies updates the detector's dependencies.

type SimilarityFinder

type SimilarityFinder interface {
	// FindSimilar returns entity IDs semantically similar to the given entity.
	// threshold is the minimum similarity score (0.0-1.0).
	// limit is the maximum number of results.
	FindSimilar(ctx context.Context, entityID string, threshold float64, limit int) ([]SimilarityResult, error)
}

SimilarityFinder provides semantic similarity search functionality. This interface is satisfied by IndexManager.

type SimilarityResult

type SimilarityResult struct {
	EntityID   string  `json:"entity_id"`
	Similarity float64 `json:"similarity"`
}

SimilarityResult represents a similarity search result.

type StatsResponse

type StatsResponse struct {
	TotalDetected int `json:"total_detected"`
	PendingReview int `json:"pending_review"`
	LLMApproved   int `json:"llm_approved"`
	LLMRejected   int `json:"llm_rejected"`
	HumanReview   int `json:"human_review"`
	HumanApproved int `json:"human_approved"`
	HumanRejected int `json:"human_rejected"`
	Applied       int `json:"applied"`
}

StatsResponse contains inference statistics.

type Storage

type Storage interface {
	// Save persists an anomaly (creates or updates).
	Save(ctx context.Context, anomaly *StructuralAnomaly) error

	// SaveWithRevision persists an anomaly with optimistic locking.
	// Returns ErrConcurrentModification if the revision has changed.
	// Pass revision=0 for new anomalies.
	SaveWithRevision(ctx context.Context, anomaly *StructuralAnomaly, revision uint64) error

	// GetWithRevision retrieves an anomaly by ID along with its KV revision.
	// The revision can be used with SaveWithRevision for optimistic locking.
	GetWithRevision(ctx context.Context, id string) (*StructuralAnomaly, uint64, error)

	// Get retrieves an anomaly by ID.
	Get(ctx context.Context, id string) (*StructuralAnomaly, error)

	// GetByStatus retrieves all anomalies with the given status.
	GetByStatus(ctx context.Context, status AnomalyStatus) ([]*StructuralAnomaly, error)

	// GetByType retrieves all anomalies of the given type.
	GetByType(ctx context.Context, anomalyType AnomalyType) ([]*StructuralAnomaly, error)

	// UpdateStatus updates an anomaly's status and optional review info.
	UpdateStatus(ctx context.Context, id string, status AnomalyStatus, reviewedBy, notes string) error

	// Delete removes an anomaly.
	Delete(ctx context.Context, id string) error

	// Watch returns a channel of anomalies as they're created/updated.
	// Used by ReviewWorker to process new pending anomalies.
	Watch(ctx context.Context) (<-chan *StructuralAnomaly, error)

	// Cleanup removes old resolved anomalies (applied/rejected) older than retention.
	// Returns the count of deleted anomalies.
	Cleanup(ctx context.Context, retention time.Duration) (int, error)

	// Count returns the total number of anomalies by status.
	Count(ctx context.Context) (map[AnomalyStatus]int, error)

	// IsDismissedPair checks if an entity pair is already tracked (any status including pending).
	// This prevents re-detecting the same semantic gap repeatedly across detection runs.
	IsDismissedPair(ctx context.Context, entityA, entityB string) (bool, error)

	// HasEntityAnomaly checks if an anomaly already exists for an entity+type combination.
	// This prevents re-detecting the same core isolation/demotion across detection runs.
	HasEntityAnomaly(ctx context.Context, entityID string, anomalyType AnomalyType) (bool, error)
}

Storage defines the interface for persisting structural anomalies.

type StorageConfig

type StorageConfig struct {
	// BucketName is the NATS KV bucket for storing anomalies
	BucketName string `json:"bucket_name"`

	// RetentionDays is how long to keep resolved anomalies (applied/rejected)
	RetentionDays int `json:"retention_days"`

	// CleanupInterval is how often to run cleanup of old anomalies
	CleanupInterval time.Duration `json:"cleanup_interval"`
}

StorageConfig configures anomaly storage

type StructuralAnomaly

type StructuralAnomaly struct {
	// ID is a unique identifier for this anomaly (UUID).
	ID string `json:"id"`

	// Type categorizes the anomaly detection method.
	Type AnomalyType `json:"type"`

	// EntityA is the primary entity involved in the anomaly.
	EntityA string `json:"entity_a"`

	// EntityB is the secondary entity (empty for single-entity anomalies like CoreDemotion).
	EntityB string `json:"entity_b,omitempty"`

	// Confidence is a score from 0.0 to 1.0 indicating detection certainty.
	Confidence float64 `json:"confidence"`

	// Evidence contains type-specific proof of the anomaly.
	Evidence Evidence `json:"evidence"`

	// Suggestion is the proposed relationship to address the anomaly.
	Suggestion *RelationshipSuggestion `json:"suggestion,omitempty"`

	// Status is the current lifecycle state.
	Status AnomalyStatus `json:"status"`

	// DetectedAt is when the anomaly was first identified.
	DetectedAt time.Time `json:"detected_at"`

	// ReviewedAt is when the anomaly was reviewed (nil if not yet reviewed).
	ReviewedAt *time.Time `json:"reviewed_at,omitempty"`

	// ReviewedBy identifies who reviewed (e.g., "llm", "user@example.com").
	ReviewedBy string `json:"reviewed_by,omitempty"`

	// LLMReasoning is the explanation provided by the LLM for its decision.
	LLMReasoning string `json:"llm_reasoning,omitempty"`

	// ReviewNotes are additional notes from human review.
	ReviewNotes string `json:"review_notes,omitempty"`

	// EntityAContext is cached context for human review display.
	EntityAContext string `json:"entity_a_context,omitempty"`

	// EntityBContext is cached context for human review display.
	EntityBContext string `json:"entity_b_context,omitempty"`
}

StructuralAnomaly represents a detected potential issue in the graph structure.

func (*StructuralAnomaly) CanAutoApprove

func (a *StructuralAnomaly) CanAutoApprove(threshold float64) bool

CanAutoApprove returns true if confidence is above the given threshold.

func (*StructuralAnomaly) CanAutoReject

func (a *StructuralAnomaly) CanAutoReject(threshold float64) bool

CanAutoReject returns true if confidence is below the given threshold.

func (*StructuralAnomaly) IsResolved

func (a *StructuralAnomaly) IsResolved() bool

IsResolved returns true if the anomaly has reached a terminal state.

func (*StructuralAnomaly) NeedsHumanReview

func (a *StructuralAnomaly) NeedsHumanReview() bool

NeedsHumanReview returns true if the anomaly requires human attention.

type TransitivityConfig

type TransitivityConfig struct {
	// Enabled activates transitivity gap detection
	Enabled bool `json:"enabled"`

	// MaxIntermediateHops is the maximum hops in A->...->B->...->C chains to analyze
	MaxIntermediateHops int `json:"max_intermediate_hops"`

	// MinExpectedTransitivity is the maximum expected A-C distance when A->B->C exists
	// Gaps where actual distance > this are flagged
	MinExpectedTransitivity int `json:"min_expected_transitivity"`

	// TransitivePredicates lists predicates that should be transitive
	// e.g., ["member_of", "part_of", "located_in"]
	TransitivePredicates []string `json:"transitive_predicates"`
}

TransitivityConfig configures the transitivity gap detector

type TransitivityDetector

type TransitivityDetector struct {
	// contains filtered or unexported fields
}

TransitivityDetector detects missing transitive relationships. When A->B and B->C exist for transitive predicates, checks if A-C distance is greater than expected, suggesting a missing A->C relationship.

func NewTransitivityDetector

func NewTransitivityDetector(deps *DetectorDependencies) *TransitivityDetector

NewTransitivityDetector creates a new transitivity gap detector.

func (*TransitivityDetector) Configure

func (d *TransitivityDetector) Configure(config interface{}) error

Configure updates the detector configuration.

func (*TransitivityDetector) Detect

Detect finds transitivity gaps in the graph.

func (*TransitivityDetector) Name

func (d *TransitivityDetector) Name() string

Name returns the detector identifier.

func (*TransitivityDetector) SetDependencies

func (d *TransitivityDetector) SetDependencies(deps *DetectorDependencies)

SetDependencies updates the detector's dependencies.

type TripleAdder

type TripleAdder interface {
	AddTriple(ctx context.Context, triple message.Triple) error
}

TripleAdder defines the interface for adding triples directly to the graph. This interface is satisfied by DataManager.

type VirtualEdgeConfig

type VirtualEdgeConfig struct {
	// AutoApply configures automatic edge creation for high-confidence gaps
	AutoApply AutoApplyConfig `json:"auto_apply"`

	// ReviewQueue configures lower-confidence gaps that need LLM/human review
	ReviewQueue ReviewQueueConfig `json:"review_queue"`
}

VirtualEdgeConfig configures automatic edge creation from high-confidence anomalies

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL