Documentation
¶
Overview ¶
Package emit provides event emission and observability for graph execution.
Package emit provides event emission and observability for graph execution.
Package emit provides event emission and observability for graph execution.
Package emit provides event emission and observability for graph execution.
Package emit provides event emission and observability for graph execution.
Index ¶
- type BufferedEmitter
- func (b *BufferedEmitter) Clear(runID string)
- func (b *BufferedEmitter) Emit(event Event)
- func (b *BufferedEmitter) EmitBatch(_ context.Context, events []Event) error
- func (b *BufferedEmitter) Flush(_ context.Context) error
- func (b *BufferedEmitter) GetHistory(runID string) []Event
- func (b *BufferedEmitter) GetHistoryWithFilter(runID string, filter HistoryFilter) []Event
- type Emitter
- type Event
- type HistoryFilter
- type LogEmitter
- type NullEmitter
- type OTelEmitter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferedEmitter ¶
type BufferedEmitter struct {
// contains filtered or unexported fields
}
BufferedEmitter implements Emitter by storing events in memory (T169-T172).
This emitter captures all events and provides query capabilities for. execution history analysis. Events are organized by runID for efficient. retrieval and filtering.
Features: - Thread-safe concurrent access. - Query by runID with optional filtering. - Filter by nodeID, message, step range. - Clear events by runID or all events.
Use cases: - Development and debugging. - Testing and validation. - Real-time monitoring dashboards. - Post-execution analysis.
Warning: This emitter stores all events in memory. For production. deployments with long-running workflows or high event volume, consider. using a persistent storage backend or implement event rotation/cleanup.
Example usage:
// Create buffered emitter for testing. emitter := emit.NewBufferedEmitter(). engine := graph.New(reducer, store, emitter, opts).
// Run workflow. engine.Run(ctx, "run-001", initialState).
// Query execution history. allEvents := emitter.GetHistory("run-001"). errorEvents := emitter.GetHistoryWithFilter("run-001", emit.HistoryFilter{Msg: "error"}).
// Clean up old runs. emitter.Clear("run-001").
func NewBufferedEmitter ¶
func NewBufferedEmitter() *BufferedEmitter
NewBufferedEmitter creates a new BufferedEmitter (T169).
Returns a BufferedEmitter that stores all events in memory and provides. query capabilities. Safe for concurrent use.
func (*BufferedEmitter) Clear ¶
func (b *BufferedEmitter) Clear(runID string)
Clear removes stored events (T170).
If runID is non-empty, clears only events for that specific run. If runID is empty, clears all stored events across all runs.
This method is thread-safe and can be called concurrently.
Example:
// Clear specific run. emitter.Clear("run-001").
// Clear all runs. emitter.Clear("").
func (*BufferedEmitter) Emit ¶
func (b *BufferedEmitter) Emit(event Event)
Emit stores an event in the buffer (T169).
Events are organized by runID for efficient retrieval. This method is. thread-safe and can be called concurrently from multiple goroutines.
func (*BufferedEmitter) EmitBatch ¶
func (b *BufferedEmitter) EmitBatch(_ context.Context, events []Event) error
EmitBatch stores multiple events in the buffer in a single operation.
func (*BufferedEmitter) Flush ¶
func (b *BufferedEmitter) Flush(_ context.Context) error
Flush is a no-op for buffered emitter (events are already stored in memory).
func (*BufferedEmitter) GetHistory ¶
func (b *BufferedEmitter) GetHistory(runID string) []Event
GetHistory retrieves all events for a specific runID (T170).
Returns events in the order they were emitted. Returns an empty slice. if no events exist for the given runID.
This method is thread-safe and returns a copy of the events to prevent. concurrent modification issues.
Example:
events := emitter.GetHistory("run-001"). for _, event := range events {. fmt.Printf("[%s] %s: %s\n", event.RunID, event.NodeID, event.Msg). }.
func (*BufferedEmitter) GetHistoryWithFilter ¶
func (b *BufferedEmitter) GetHistoryWithFilter(runID string, filter HistoryFilter) []Event
GetHistoryWithFilter retrieves filtered events for a specific runID (T171, T172).
Applies the provided filter criteria to select matching events. All filter. conditions must match for an event to be included (AND logic).
Returns events in the order they were emitted. Returns an empty slice if. no events match the filter.
This method is thread-safe and returns a copy of the events.
Example:
// Get error events from "validator" node. filter := emit.HistoryFilter{.
NodeID: "validator", Msg: "error", }.
errors := emitter.GetHistoryWithFilter("run-001", filter).
// Get events from steps 10-20. minStep, maxStep := 10, 20. filter := emit.HistoryFilter{.
MinStep: &minStep, MaxStep: &maxStep, }.
stepEvents := emitter.GetHistoryWithFilter("run-001", filter).
type Emitter ¶
type Emitter interface {
// Emit sends an observability event to the configured backend.
//
// Implementations should not block workflow execution.
// If the backend is unavailable or slow, events should be:
// - Buffered for later delivery.
// - Dropped with error logging.
// - Sent asynchronously.
//
// Emit should not panic. Errors should be logged internally.
Emit(event Event)
// EmitBatch sends multiple events in a single operation for improved performance.
//
// Batching reduces overhead when emitting high volumes of events by:
// - Amortizing network round-trips across multiple events.
// - Reducing serialization overhead.
// - Enabling backend bulk insert optimizations.
// - Improving throughput for high-concurrency workflows.
//
// Implementations should:
// - Process events in order (maintain happened-before relationships).
// - Not block workflow execution (buffer or process asynchronously).
// - Handle partial failures gracefully (log and continue).
// - Not panic on errors.
//
// Parameters:
// - ctx: Context for cancellation and timeouts.
// - events: Events to emit, ordered by creation time.
//
// Returns error only on catastrophic failures (e.g., configuration errors).
// Individual event failures should be logged but not returned.
//
// Example usage:
//
// events := []Event{.
// {RunID: "run-001", Msg: "step_start", ...},
// {RunID: "run-001", Msg: "step_complete", ...},
// }.
// if err := emitter.EmitBatch(ctx, events); err != nil {.
// log.Errorf("batch emit failed: %v", err).
// }.
EmitBatch(ctx context.Context, events []Event) error
// Flush ensures all buffered events are sent to the backend.
//
// Call this method:
// - Before application shutdown to prevent event loss.
// - At workflow completion to ensure all events are delivered.
// - After critical operations requiring immediate visibility.
// - During testing to verify event emission.
//
// Implementations should:
// - Block until all buffered events are sent or timeout occurs.
// - Respect context cancellation and deadlines.
// - Return error if events cannot be delivered.
// - Be safe to call multiple times (idempotent).
//
// Parameters:
// - ctx: Context for cancellation and timeout.
//
// Returns error if flush fails or times out. Implementations should attempt.
// best-effort delivery even on error (e.g., flush partial buffers).
//
// Example usage:
//
// defer func() {.
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second).
// defer cancel().
// if err := emitter.Flush(ctx); err != nil {.
// log.Errorf("failed to flush events on shutdown: %v", err).
// }.
// }().
Flush(ctx context.Context) error
}
Emitter receives and processes observability events from workflow execution.
Emitters enable pluggable observability backends: - Logging: stdout, files, syslog. - Distributed tracing: OpenTelemetry, Jaeger, Zipkin. - Metrics: Prometheus, StatsD. - Analytics: DataDog, New Relic.
Implementations should be: - Non-blocking: Avoid slowing down workflow execution. - Thread-safe: May be called concurrently from multiple nodes. - Resilient: Handle failures gracefully (don't crash workflow).
Common patterns: - Buffering: Collect events and flush in batches. - Filtering: Only emit events matching criteria (e.g., errors only). - Multi-emit: Fan out to multiple backends. - Sampling: Emit only a percentage of events for high-volume workflows.
type Event ¶
type Event struct {
// RunID identifies the workflow execution that emitted this event.
RunID string
// Step is the sequential step number in the workflow (1-indexed).
// Zero for workflow-level events (start, complete, error).
Step int
// NodeID identifies which node emitted this event.
// Empty string for workflow-level events.
NodeID string
// Msg is a human-readable description of the event.
Msg string
// Meta contains additional structured data specific to this event (T168).
//
// Standard Metadata Conventions:
//
// Performance Metrics:
// - "duration_ms" (int64): Execution duration in milliseconds.
// - "memory_bytes" (int64): Memory usage in bytes.
// - "cpu_percent" (float64): CPU utilization percentage.
//
// Error Context:
// - "error" (string): Error message from error.Error().
// - "error_type" (string): Error type or classification (e.g., "validation", "timeout").
// - "retryable" (bool): Whether the error can be retried.
// - "retry_attempt" (int): Current retry attempt number (1-indexed).
// - "stack_trace" (string): Optional stack trace for debugging.
//
// LLM-Specific:
// - "tokens" (int): Total token count (input + output).
// - "input_tokens" (int): Input token count.
// - "output_tokens" (int): Output token count.
// - "cost" (float64): Estimated cost in USD.
// - "model" (string): Model identifier (e.g., "gpt-4", "claude-3-opus").
// - "temperature" (float64): Temperature parameter used.
// - "max_tokens" (int): Max tokens parameter used.
// - "finish_reason" (string): Completion reason (e.g., "stop", "length", "tool_calls").
//
// Node Classification:
// - "node_type" (string): Node category (e.g., "llm", "tool", "processor", "validator").
// - "node_version" (string): Node implementation version.
//
// Checkpoint Context:
// - "checkpoint_id" (string): Checkpoint identifier.
// - "checkpoint_label" (string): Human-readable checkpoint label.
// - "state_size" (int): Serialized state size in bytes.
//
// Tool Execution:
// - "tool_name" (string): Name of the tool being executed.
// - "tool_input" (any): Tool input parameters (if serializable).
// - "tool_output" (any): Tool output result (if serializable).
//
// State Changes:
// - "delta" (any): State change applied by the node.
// - "state_version" (int): State version number.
//
// Routing Decisions:
// - "next_node" (string): Next node ID for routing decisions.
// - "next_nodes" ([]string): Multiple next nodes for fan-out.
// - "routing_reason" (string): Explanation of routing decision.
// - "condition" (string): Condition that triggered the route.
//
// Custom Application Data:
// Applications can add domain-specific metadata using their own keys.
// Use namespaced keys to avoid conflicts (e.g., "app_user_id", "app_request_id").
//
// Usage with helper methods:
//
// event := Event{RunID: "run-001", NodeID: "llm-node", Msg: "node_end"}.
// enriched := event.
// WithDuration(250 * time.Millisecond).
// WithNodeType("llm").
// WithMeta("tokens", 150).
// WithMeta("cost", 0.003).
// WithMeta("model", "gpt-4").
Meta map[string]interface{}
}
Event represents an observability event emitted during workflow execution.
Events provide detailed insight into workflow behavior: - Node execution start/complete. - State changes and transitions. - Errors and warnings. - Performance metrics. - Checkpoint operations.
Events are emitted to an Emitter which can: - Log to stdout/stderr. - Send to OpenTelemetry. - Store in time-series databases. - Trigger alerts.
func (Event) WithDuration ¶
WithDuration returns a copy of the event with duration_ms metadata (T166).
Sets the "duration_ms" field to the duration in milliseconds as an int64. Preserves all existing metadata fields.
Example:
event := Event{RunID: "run-001", Msg: "node_end"}. enriched := event.WithDuration(250 * time.Millisecond). // enriched.Meta["duration_ms"] == 250.
func (Event) WithError ¶
WithError returns a copy of the event with error metadata (T166).
Sets the "error" field to the error message string. Preserves all existing metadata fields.
Example:
event := Event{RunID: "run-001", Msg: "error"}. enriched := event.WithError(errors.New("validation failed")). // enriched.Meta["error"] == "validation failed".
func (Event) WithMeta ¶
WithMeta returns a copy of the event with an additional metadata field (T167).
Sets the specified key-value pair in the metadata map. Preserves all existing metadata fields. If the key already exists, it will be overwritten.
This method supports chaining for fluent API usage:
event := Event{RunID: "run-001"}.
enriched := event.
WithMeta("tokens", 150).
WithMeta("cost", 0.003).
WithMeta("model", "gpt-4").
Example:
event := Event{RunID: "run-001", Msg: "llm_call"}. enriched := event.WithMeta("tokens", 150). // enriched.Meta["tokens"] == 150.
func (Event) WithNodeType ¶
WithNodeType returns a copy of the event with node_type metadata (T167).
Sets the "node_type" field to the provided node type string. Preserves all existing metadata fields.
Common node types: - "llm": LLM call nodes. - "tool": Tool execution nodes. - "processor": Data processing nodes. - "validator": Validation nodes. - "aggregator": Result aggregation nodes.
Example:
event := Event{RunID: "run-001", NodeID: "llm-node"}. enriched := event.WithNodeType("llm"). // enriched.Meta["node_type"] == "llm".
type HistoryFilter ¶
type HistoryFilter struct {
NodeID string // Filter by node ID (empty = no filter)
Msg string // Filter by message (empty = no filter)
MinStep *int // Minimum step number (nil = no filter)
MaxStep *int // Maximum step number (nil = no filter)
}
HistoryFilter specifies criteria for filtering execution history (T171, T172).
All filter fields are optional. When multiple fields are set, they are. combined with AND logic (all conditions must match).
Fields: - NodeID: Filter by specific node. - Msg: Filter by message type (e.g., "node_start", "error"). - MinStep: Filter events with step >= MinStep (nil = no lower bound). - MaxStep: Filter events with step <= MaxStep (nil = no upper bound).
Example usage:
// Get all errors from a specific node. filter := emit.HistoryFilter{.
NodeID: "validator", Msg: "error", }.
errors := emitter.GetHistoryWithFilter("run-001", filter).
// Get events from steps 5-10. minStep, maxStep := 5, 10. filter := emit.HistoryFilter{.
MinStep: &minStep, MaxStep: &maxStep, }.
stepEvents := emitter.GetHistoryWithFilter("run-001", filter).
type LogEmitter ¶
type LogEmitter struct {
// contains filtered or unexported fields
}
LogEmitter implements Emitter by writing structured log output to a writer (T161).
Supports two output modes: - Text mode (default): Human-readable format with key=value pairs. - JSON mode: Machine-readable JSON format, one event per line.
Example text output:
[node_start] runID=run-001 step=0 nodeID=nodeA.
Example JSON output:
{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}.
Usage:
// Text output to stdout. emitter := emit.NewLogEmitter(os.Stdout, false).
// JSON output to file. f, _ := os.Create("events.jsonl"). defer func() { _ = f.Close() }(). emitter := emit.NewLogEmitter(f, true).
func NewLogEmitter ¶
func NewLogEmitter(writer io.Writer, jsonMode bool) *LogEmitter
NewLogEmitter creates a new LogEmitter (T161, T163).
Parameters: - writer: Where to write the log output (e.g., os.Stdout, file). - jsonMode: If true, emit JSON format; if false, emit text format.
Returns a LogEmitter that writes structured event data to the provided writer.
func (*LogEmitter) Emit ¶
func (l *LogEmitter) Emit(event Event)
Emit writes an event to the configured writer (T161).
Format depends on jsonMode: - JSON mode: Writes event as single-line JSON object. - Text mode: Writes human-readable format with [msg] prefix.
Example text output:
[node_start] runID=run-001 step=0 nodeID=nodeA. [node_end] runID=run-001 step=0 nodeID=nodeA meta={"delta":{"counter":5}}.
Example JSON output:
{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}. {"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_end","meta":{"delta":{"counter":5}}}.
func (*LogEmitter) EmitBatch ¶
func (l *LogEmitter) EmitBatch(_ context.Context, events []Event) error
EmitBatch sends multiple events in a single operation for improved performance (T107).
For LogEmitter, batching provides efficiency by: - Reducing write syscalls (one write per batch vs per event). - Better formatting when viewing multiple related events. - Maintaining chronological order within the batch.
In text mode, events are written with blank lines between them for readability. In JSON mode, events are written as JSONL (one per line) for easy parsing.
Example text output:
[node_start] runID=run-001 step=0 nodeID=nodeA. [node_end] runID=run-001 step=0 nodeID=nodeA. [node_start] runID=run-001 step=1 nodeID=nodeB.
Example JSON output:
{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}. {"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_end","meta":{"delta":{"counter":5}}}. {"runID":"run-001","step":1,"nodeID":"nodeB","msg":"node_start","meta":null}.
This implementation is more efficient than calling Emit repeatedly because: 1. It can batch multiple events into fewer write operations. 2. It can optimize formatting across the entire batch. 3. It reduces locking overhead if the writer is synchronized.
Parameters: - ctx: Context for cancellation (currently unused but reserved for future enhancements). - events: Slice of events to emit in order.
Returns error only if writing fails. Always attempts to write all events.
func (*LogEmitter) Flush ¶
func (l *LogEmitter) Flush(_ context.Context) error
Flush ensures all buffered events are sent to the backend (T108).
For LogEmitter, this is a no-op because: - All writes go directly to the underlying io.Writer. - No internal buffering is maintained by LogEmitter. - The writer itself handles its own buffering (e.g., os.Stdout, bufio.Writer).
If you need flush control, wrap the writer with bufio.Writer and call Flush on it directly:
buf := bufio.NewWriter(os.Stdout). emitter := emit.NewLogEmitter(buf, false).
// ... emit events ...
buf.Flush() // Flush the underlying buffer. emitter.Flush(ctx) // No-op for LogEmitter.
This method is provided to satisfy the Emitter interface and enable polymorphic usage. with other emitters (e.g., OTelEmitter) that do require flushing.
Parameters: - ctx: Context for cancellation (unused, LogEmitter writes are synchronous).
Returns nil (always succeeds).
type NullEmitter ¶
type NullEmitter struct{}
NullEmitter implements Emitter by discarding all events (T165).
This is a no-op emitter for production environments where event. logging is not desired. It implements the Emitter interface but. does nothing with emitted events.
Use cases: - Production deployments where observability overhead is unwanted. - Testing scenarios where event capture is not needed. - Disabling event emission without changing code.
Example usage:
// Disable all event logging. emitter := emit.NewNullEmitter(). engine := graph.New(reducer, store, emitter, opts).
func NewNullEmitter ¶
func NewNullEmitter() *NullEmitter
NewNullEmitter creates a new NullEmitter (T165).
Returns a NullEmitter that discards all events without any processing. This is safe for concurrent use and has zero overhead.
func (*NullEmitter) Emit ¶
func (n *NullEmitter) Emit(_ Event)
Emit discards the event without any processing (T165).
This method is a no-op that immediately returns. It never errors. and performs no I/O or processing.
type OTelEmitter ¶
type OTelEmitter struct {
// contains filtered or unexported fields
}
OTelEmitter implements Emitter by creating OpenTelemetry spans (T109-T111).
Each event becomes a span with:
- Span name: event.Msg (e.g., "node_start", "node_end")
- Attributes: runID, step, nodeID, and all event.Meta fields
- Timestamps: Derived from span creation
- Status: Set to error if event.Meta["error"] exists
Supports distributed tracing by:
- Creating child spans for node execution
- Propagating trace context across service boundaries
- Recording performance metrics as span attributes
- Capturing errors with stack traces
Concurrency attributes (T111):
- step_id: Unique identifier for the execution step
- order_key: Deterministic ordering key for replay
- attempt: Retry attempt number (0 for first attempt)
Usage:
// Create tracer from OpenTelemetry provider
tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)
// Emit events that become spans
emitter.Emit(Event{
RunID: "run-001",
Step: 1,
NodeID: "nodeA",
Msg: "node_start",
})
Integration with OpenTelemetry:
// Setup OpenTelemetry provider (application code)
import (
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Create trace provider with exporter (Jaeger, Zipkin, etc.)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)
// Create emitter
tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)
// Use in engine
engine := graph.New[MyState](
graph.WithEmitter(emitter),
)
func NewOTelEmitter ¶
func NewOTelEmitter(tracer trace.Tracer) *OTelEmitter
NewOTelEmitter creates a new OTelEmitter (T109).
Parameters:
- tracer: OpenTelemetry tracer from otel.Tracer("service-name")
Returns an OTelEmitter that creates spans for each event.
Example:
tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)
func (*OTelEmitter) Emit ¶
func (o *OTelEmitter) Emit(event Event)
Emit creates an OpenTelemetry span for the event (T109).
The span includes:
- Name: event.Msg (e.g., "node_start", "node_end")
- Attributes: All event fields and metadata
- Status: Error if event contains error metadata
- Timestamps: Start time (now), end time (immediate for instant events)
For performance, the span is immediately ended (not left open). This is appropriate for events representing points in time rather than durations.
If the event contains a "duration_ms" metadata field, the span's end time is adjusted to reflect the actual duration.
func (*OTelEmitter) EmitBatch ¶
func (o *OTelEmitter) EmitBatch(ctx context.Context, events []Event) error
EmitBatch creates multiple spans efficiently (T109).
Batching provides performance benefits by:
- Amortizing tracer overhead across multiple spans
- Enabling span processor batch optimizations
- Reducing context switching overhead
- Maintaining temporal locality for related events
All spans are created and ended immediately. They are recorded in the OpenTelemetry batch span processor for efficient export.
Parameters:
- ctx: Context for cancellation and trace propagation
- events: Events to emit as spans
Returns error if span creation fails (rare, usually indicates misconfiguration).
func (*OTelEmitter) Flush ¶
func (o *OTelEmitter) Flush(ctx context.Context) error
Flush forces export of all pending spans (T110).
This method:
- Calls ForceFlush on the tracer provider if available
- Blocks until all spans are exported or timeout occurs
- Should be called before application shutdown
- Respects context cancellation and deadlines
OpenTelemetry typically buffers spans in a batch span processor for efficiency. Flush ensures these buffered spans are sent to the backend (Jaeger, Zipkin, etc.) before the application exits.
Usage:
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := emitter.Flush(ctx); err != nil {
log.Printf("failed to flush spans: %v", err)
}
}()
Parameters:
- ctx: Context with timeout/cancellation
Returns error if flush fails or times out.