agentdrain

package
v0.68.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 12 Imported by: 0

README

agentdrain Package

The agentdrain package implements the Drain log template mining algorithm adapted for analyzing structured agent pipeline events. It is used for anomaly detection in agentic workflow runs.

Overview

Drain is an online log parsing algorithm that groups log lines into clusters based on token similarity. Each cluster has a template — a tokenized log pattern where variable tokens are replaced with a wildcard (<*>). When a new log line arrives, Drain finds the most similar existing cluster or creates a new one.

In GitHub Agentic Workflows, agentdrain processes AgentEvent records emitted by pipeline stages (e.g. "plan", "tool_call", "finish") to:

  1. Build a model of normal behavior by training on events from successful runs.
  2. Detect anomalies in new runs by comparing events against the learned model.

Types

Config

Tuning parameters for the Drain miner.

Field Type Default Description
Depth int 4 Parse-tree depth
SimThreshold float64 0.4 Minimum similarity score to match a cluster
MaxChildren int 100 Maximum children per tree node
ParamToken string "<*>" Wildcard inserted at variable positions
RareClusterThreshold int 2 Clusters with Size ≤ this value are flagged as rare
MaskRules []MaskRule (see below) Regex substitutions applied before tokenization
ExcludeFields []string ["session_id", "trace_id", "span_id", "timestamp"] Event fields excluded from flattening

Use DefaultConfig() for production-ready defaults.

MaskRule

A regex-based substitution applied to log lines before tokenization to normalize variable content.

type MaskRule struct {
    Name        string // Human-readable identifier
    Pattern     string // Regular expression
    Replacement string // Substitution string
}

Default mask rules normalize UUIDs, session IDs, numeric values, URLs, quoted strings, and timestamps.

AgentEvent

A structured event from an agent pipeline stage.

type AgentEvent struct {
    Stage  string            // e.g. "plan", "tool_call", "finish"
    Fields map[string]string // Key-value pairs from the log line
}
Cluster

A group of log lines that share the same template.

type Cluster struct {
    ID       int      // Unique identifier
    Template []string // Tokenized template with wildcards
    Size     int      // Number of lines assigned to this cluster
    Stage    string   // Pipeline stage that generated this cluster
}
MatchResult

Returned after processing a log line.

type MatchResult struct {
    ClusterID  int      // Matched or newly created cluster ID
    Template   string   // Space-joined template string
    Params     []string // Actual token values at wildcard positions
    Similarity float64  // Fraction of non-wildcard tokens that matched exactly
    Stage      string   // Pipeline stage of the matched cluster
}
AnomalyReport

Describes anomalies detected for a log line.

type AnomalyReport struct {
    IsNewTemplate     bool    // Line created a new cluster
    LowSimilarity     bool    // Best match score was below SimThreshold
    RareCluster       bool    // Matched cluster has been seen ≤ RareClusterThreshold times
    NewClusterCreated bool    // This event produced a brand-new cluster
    AnomalyScore      float64 // Weighted composite score in [0, 1]
    Reason            string  // Human-readable anomaly description
}

Core Components

Miner

The single-stage Drain miner. Processes one pipeline stage at a time.

cfg := agentdrain.DefaultConfig()
miner, err := agentdrain.NewMiner(cfg)

// Training phase — call for known-good events
result, err := miner.TrainEvent(evt)

// Analysis phase — call for events to check
result, report, err := miner.AnalyzeEvent(evt)

// Inspect clusters
clusters := miner.Clusters()
count := miner.ClusterCount()
Persistence
// Save miner state to JSON
data, err := miner.SaveJSON()

// Restore miner state from JSON
err = miner.LoadJSON(data)
Coordinator

Manages a separate Miner per pipeline stage, routing events to the correct miner.

stages := []string{"plan", "tool_call", "finish"}
coord, err := agentdrain.NewCoordinator(cfg, stages)

// Load default trained weights
err = coord.LoadDefaultWeights()

// Analyze an event
result, report, err := coord.AnalyzeEvent(evt)

// Access all clusters across all stages
allClusters := coord.AllClusters()

// Save/restore snapshots
snapshots, err := coord.SaveSnapshots()
err = coord.LoadSnapshots(snapshots)

// Save/restore coordinator weights as JSON
data, err := coord.SaveWeightsJSON()
err = coord.LoadWeightsJSON(data)
AnomalyDetector

Post-processes MatchResult values to produce an AnomalyReport.

detector := agentdrain.NewAnomalyDetector(cfg.SimThreshold, cfg.RareClusterThreshold)
report := detector.Analyze(result, isNew, cluster)
Masker

Applies MaskRule substitutions to log lines before tokenization.

masker, err := agentdrain.NewMasker(cfg.MaskRules)
masked := masker.Mask(rawLine)
Utility Functions
FlattenEvent(evt AgentEvent, excludeFields []string) string

Converts an AgentEvent to a single string for tokenization, omitting fields listed in excludeFields. Fields are sorted for deterministic output.

Tokenize(line string) []string

Splits a log line into tokens on whitespace boundaries.

StageSequence(events []AgentEvent) string

Returns a comma-separated string of the stages from a slice of events. Useful for summarizing pipeline execution paths.

Default Weights

The package embeds a set of default trained weights (in data/) via //go:embed. Call coord.LoadDefaultWeights() to initialize the coordinator with pre-trained cluster weights rather than starting cold.

Design Notes

  • The Drain algorithm is O(n·d) per event, where n is the number of tokens and d is Depth.
  • SimThreshold of 0.4 means at least 40% of tokens must match exactly (excluding wildcards) for a line to join an existing cluster.
  • The Coordinator routes each AgentEvent to its stage-specific Miner so that templates from different stages do not interfere.
  • SaveJSON/LoadJSON serialize the parse tree and cluster list to enable persistence across workflow runs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FlattenEvent

func FlattenEvent(evt AgentEvent, excludeFields []string) string

FlattenEvent converts an AgentEvent into a deterministic string suitable for template mining. Field keys are sorted alphabetically; fields listed in excludeFields are omitted. The result looks like:

stage=tool_call key1=val1 key2=val2

func StageSequence

func StageSequence(events []AgentEvent) string

StageSequence converts a slice of AgentEvents into a space-separated string of their stage names, e.g. "plan tool_call tool_result finish".

func Tokenize

func Tokenize(line string) []string

Tokenize splits a log line on whitespace and returns the individual tokens.

Types

type AgentEvent

type AgentEvent struct {
	// Stage identifies the pipeline stage (e.g., "plan", "tool_call", "finish").
	Stage string
	// Fields contains the key-value pairs parsed from the log line.
	Fields map[string]string
}

AgentEvent is a structured log event emitted by an agent pipeline stage.

type AnomalyDetector

type AnomalyDetector struct {
	// contains filtered or unexported fields
}

AnomalyDetector evaluates match results and produces AnomalyReports.

func NewAnomalyDetector

func NewAnomalyDetector(simThreshold float64, rareClusterThreshold int) *AnomalyDetector

NewAnomalyDetector creates an AnomalyDetector with the given thresholds.

func (*AnomalyDetector) Analyze

func (d *AnomalyDetector) Analyze(result *MatchResult, isNew bool, cluster *Cluster) *AnomalyReport

Analyze produces an AnomalyReport for a match result.

  • isNew indicates the line created a brand-new cluster.
  • cluster is the cluster that was matched or created.

type AnomalyReport

type AnomalyReport struct {
	// IsNewTemplate is true when the log line created a new cluster.
	IsNewTemplate bool
	// LowSimilarity is true when the best match score was below the configured threshold.
	LowSimilarity bool
	// RareCluster is true when the matched cluster has been seen fewer times than the rare threshold.
	RareCluster bool
	// NewClusterCreated is true when this event produced a brand-new cluster.
	NewClusterCreated bool
	// AnomalyScore is a weighted composite score in the range [0, 1].
	AnomalyScore float64
	// Reason is a human-readable description of all anomalies that were detected.
	Reason string
}

AnomalyReport describes anomalies detected for a log line.

type Cluster

type Cluster struct {
	// ID is the unique cluster identifier.
	ID int
	// Template is the tokenized log template with wildcards at variable positions.
	Template []string
	// Size is the number of log lines that have been assigned to this cluster.
	Size int
	// Stage identifies which agent stage generated this cluster.
	Stage string
}

Cluster represents a group of log lines that share the same template.

type Config

type Config struct {
	// Depth controls how many levels of the parse tree are used.
	Depth int
	// SimThreshold is the minimum similarity score (0–1) required to match an existing cluster.
	SimThreshold float64
	// MaxChildren limits the number of children per internal tree node.
	MaxChildren int
	// ParamToken is the wildcard string inserted where tokens differ across log lines.
	ParamToken string
	// RareClusterThreshold marks clusters with size ≤ this value as rare.
	RareClusterThreshold int
	// MaskRules are applied before tokenization to normalize variable parts of log lines.
	MaskRules []MaskRule
	// ExcludeFields lists AgentEvent field keys that are omitted when flattening events.
	ExcludeFields []string
}

Config holds tuning parameters for the Drain log template miner.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config pre-loaded with sensible production defaults.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages one Miner per agent pipeline stage.

func NewCoordinator

func NewCoordinator(cfg Config, stages []string) (*Coordinator, error)

NewCoordinator creates a Coordinator with one Miner for each provided stage name.

func (*Coordinator) AllClusters

func (c *Coordinator) AllClusters() map[string][]Cluster

AllClusters returns a map from stage name to the list of clusters in that miner.

func (*Coordinator) AnalyzeEvent

func (c *Coordinator) AnalyzeEvent(evt AgentEvent) (*MatchResult, *AnomalyReport, error)

AnalyzeEvent routes the event to the correct stage miner and returns both the match result and an anomaly report.

func (*Coordinator) LoadDefaultWeights

func (c *Coordinator) LoadDefaultWeights() error

LoadDefaultWeights restores all stage miners from the embedded default weights file (pkg/agentdrain/data/default_weights.json). When the file is empty or contains only an empty JSON object the call is a no-op and returns nil.

Update the default weights by running:

gh aw logs --train --output <dir>

and copying the resulting drain3_weights.json to pkg/agentdrain/data/default_weights.json, then rebuilding the binary.

func (*Coordinator) LoadSnapshots

func (c *Coordinator) LoadSnapshots(snapshots map[string][]byte) error

LoadSnapshots restores each stage miner from the provided JSON bytes map. Stages that are not present in snapshots retain their current state.

func (*Coordinator) LoadWeightsJSON

func (c *Coordinator) LoadWeightsJSON(data []byte) error

LoadWeightsJSON restores all stage miners from a combined JSON blob produced by SaveWeightsJSON.

func (*Coordinator) SaveSnapshots

func (c *Coordinator) SaveSnapshots() (map[string][]byte, error)

SaveSnapshots serializes each stage miner's state and returns a map from stage name to JSON bytes.

func (*Coordinator) SaveWeightsJSON

func (c *Coordinator) SaveWeightsJSON() ([]byte, error)

SaveWeightsJSON serializes all stage snapshots into a single combined JSON blob. The result can be written to pkg/agentdrain/data/default_weights.json and committed to embed it as the default starting weights for future runs.

func (*Coordinator) TrainEvent

func (c *Coordinator) TrainEvent(evt AgentEvent) (*MatchResult, error)

TrainEvent routes the event to the miner responsible for evt.Stage. Returns an error when the stage has no associated miner.

type MaskRule

type MaskRule struct {
	// Name is a human-readable identifier for the rule.
	Name string
	// Pattern is the regular expression to match.
	Pattern string
	// Replacement is the string substituted for each match.
	Replacement string
}

MaskRule describes a regex substitution applied to log lines before processing.

type Masker

type Masker struct {
	// contains filtered or unexported fields
}

Masker applies a sequence of regex substitution rules to normalize log lines.

func NewMasker

func NewMasker(rules []MaskRule) (*Masker, error)

NewMasker compiles the given MaskRules into a Masker ready for use. Returns an error if any pattern fails to compile.

func (*Masker) Mask

func (m *Masker) Mask(line string) string

Mask applies all mask rules in order and returns the transformed line.

type MatchResult

type MatchResult struct {
	// ClusterID is the ID of the matched or newly created cluster.
	ClusterID int
	// Template is the space-joined template string.
	Template string
	// Params holds the actual token values at wildcard positions.
	Params []string
	// Similarity is the fraction of non-wildcard positions that matched exactly.
	Similarity float64
	// Stage is the agent stage associated with the matched cluster.
	Stage string
}

MatchResult is returned after processing a log line through the miner.

type Miner

type Miner struct {
	// contains filtered or unexported fields
}

Miner is a concurrent Drain-style log template miner. Use NewMiner to create an instance.

func NewMiner

func NewMiner(cfg Config) (*Miner, error)

NewMiner creates a Miner from the given Config.

func (*Miner) AnalyzeEvent

func (m *Miner) AnalyzeEvent(evt AgentEvent) (*MatchResult, *AnomalyReport, error)

AnalyzeEvent performs inference on the event, builds an AnomalyReport, and then calls TrainEvent to update the miner. Returns the match result and report.

func (*Miner) ClusterCount

func (m *Miner) ClusterCount() int

ClusterCount returns the number of known clusters.

func (*Miner) Clusters

func (m *Miner) Clusters() []Cluster

Clusters returns a snapshot of all known clusters.

func (*Miner) LoadJSON

func (m *Miner) LoadJSON(data []byte) error

LoadJSON restores miner state from JSON bytes produced by SaveJSON. The existing state is replaced; the parse tree is rebuilt from the snapshot.

func (*Miner) SaveJSON

func (m *Miner) SaveJSON() ([]byte, error)

SaveJSON serializes the miner's current state to JSON bytes.

func (*Miner) Train

func (m *Miner) Train(line string) (*MatchResult, error)

Train processes a raw log line, updates the miner state, and returns the match result. It is safe to call from multiple goroutines.

func (*Miner) TrainEvent

func (m *Miner) TrainEvent(evt AgentEvent) (*MatchResult, error)

TrainEvent flattens the AgentEvent and calls Train.

type Snapshot

type Snapshot struct {
	Config   Config            `json:"config"`
	Clusters []SnapshotCluster `json:"clusters"`
	NextID   int               `json:"next_id"`
}

Snapshot is the serializable representation of a Miner's state.

type SnapshotCluster

type SnapshotCluster struct {
	ID       int      `json:"id"`
	Template []string `json:"template"`
	Size     int      `json:"size"`
	Stage    string   `json:"stage"`
}

SnapshotCluster is the serializable form of a single Cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL