emit

package
v0.4.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 11 Imported by: 1

README

Event Emission and Observability

The emit package provides event emission and observability for LangGraph-Go workflow execution. It enables pluggable observability backends through a clean interface, supporting logging, distributed tracing, metrics, and analytics.

Table of Contents

Overview

The Emitter interface provides a standard way to capture and process workflow execution events:

type Emitter interface {
    Emit(event Event)
    EmitBatch(ctx context.Context, events []Event) error
    Flush(ctx context.Context) error
}

Events are emitted at key points during workflow execution:

  • Node execution start/completion
  • State transitions
  • Errors and warnings
  • Checkpoint operations
  • Routing decisions

Available Emitters

LogEmitter

Writes structured logs to any io.Writer in text or JSON format.

import (
    "log"
    "os"
    "github.com/dshills/langgraph-go/graph/emit"
)

// Human-readable text output to stdout
textEmitter := emit.NewLogEmitter(os.Stdout, false)

// Machine-readable JSON output to file
f, err := os.Create("events.jsonl")
if err != nil {
    log.Fatal(err)
}
defer f.Close()
jsonEmitter := emit.NewLogEmitter(f, true)

Use cases:

  • Development and debugging
  • Simple production deployments
  • File-based audit trails
BufferedEmitter

Stores events in memory with query capabilities for execution history analysis.

import (
    "context"
    "log"
    "github.com/dshills/langgraph-go/graph/emit"
)

// Create buffered emitter
emitter := emit.NewBufferedEmitter()

// Run workflow
_, err := engine.Run(ctx, "run-001", initialState)
if err != nil {
    log.Printf("Workflow failed: %v", err)
}

// Query execution history
allEvents := emitter.GetHistory("run-001")
errorEvents := emitter.GetHistoryWithFilter("run-001", emit.HistoryFilter{
    Msg: "error",
})

// Clean up after analysis to prevent memory leaks
emitter.Clear("run-001")

Use cases:

  • Testing and validation
  • Real-time monitoring dashboards
  • Post-execution analysis

⚠️ Memory Warning:

  • Stores ALL events in memory with no automatic eviction
  • Memory usage grows unbounded for long-running workflows
  • Best practices:
    • Call Clear(runID) after analyzing each workflow
    • Monitor memory usage: len(emitter.GetHistory(runID)) events stored
    • For production, use LogEmitter or OTelEmitter with external storage
    • Limit to short-lived workflows or implement periodic cleanup
NullEmitter

Discards all events for maximum performance.

emitter := emit.NewNullEmitter()

Use cases:

  • Performance-critical production deployments where observability is disabled
  • Benchmarking workflow execution overhead
OTelEmitter

Creates OpenTelemetry spans for distributed tracing. See OpenTelemetry Integration for details.

OpenTelemetry Integration

The OTelEmitter integrates LangGraph-Go workflows with OpenTelemetry distributed tracing, enabling visualization of workflow execution in tools like Jaeger, Zipkin, and cloud APM platforms.

Quick Start
import (
    "context"
    "log"
    "time"

    "github.com/dshills/langgraph-go/graph"
    "github.com/dshills/langgraph-go/graph/emit"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

// Setup OpenTelemetry with Jaeger exporter
func setupTracing() (*sdktrace.TracerProvider, error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("langgraph-workflow"),
        )),
    )
    otel.SetTracerProvider(tp)
    return tp, nil
}

// Create OTelEmitter and use with engine
func main() {
    tp, err := setupTracing()
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        if err := tp.Shutdown(ctx); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }()

    tracer := otel.Tracer("langgraph-go")
    emitter := emit.NewOTelEmitter(tracer)

    engine := graph.New[MyState](
        graph.WithEmitter(emitter),
    )

    // Run workflow - spans will be created automatically
    ctx := context.Background()
    _, err = engine.Run(ctx, "run-001", initialState)
    if err != nil {
        log.Printf("Workflow failed: %v", err)
    }
}
Exporters

OpenTelemetry supports multiple backends:

Jaeger (Self-Hosted)
import "go.opentelemetry.io/otel/exporters/jaeger"

exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
    jaeger.WithEndpoint("http://localhost:14268/api/traces"),
))
OTLP (OpenTelemetry Protocol)
import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"

exporter, err := otlptracegrpc.New(ctx,
    otlptracegrpc.WithEndpoint("localhost:4317"),
    otlptracegrpc.WithInsecure(),
)
Zipkin
import "go.opentelemetry.io/otel/exporters/zipkin"

exporter, err := zipkin.New("http://localhost:9411/api/v2/spans")
Console (Development)
import "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"

exporter, err := stdouttrace.New(
    stdouttrace.WithPrettyPrint(),
)
Span Schema

Each event creates a span with the following structure:

Span Name

The span name is set to event.Msg (e.g., node_start, node_end, error).

Standard Attributes
Attribute Type Description
langgraph.run_id string Unique workflow execution identifier
langgraph.step int Sequential step number (1-indexed)
langgraph.node_id string Node identifier that emitted the event
Metadata Attributes

Event metadata (event.Meta) is automatically mapped to span attributes:

Type Conversions:

  • stringattribute.String
  • int, int64attribute.Int64
  • float64attribute.Float64
  • boolattribute.Bool
  • time.Durationattribute.Int64 (milliseconds)
  • Other types → attribute.String (formatted)

Special Mappings:

Event Meta Key Span Attribute Description
input_tokens langgraph.llm.input_tokens LLM input token count
output_tokens langgraph.llm.output_tokens LLM output token count
cost langgraph.llm.cost LLM cost in USD
duration_ms langgraph.node.duration_ms Node execution duration
model langgraph.llm.model LLM model identifier

Note: Use these canonical metadata keys consistently. The OTel emitter automatically maps these to span attributes.

Concurrency Attributes

For concurrent execution tracking:

Attribute Type Description
langgraph.step_id string Unique step execution identifier
langgraph.order_key string Deterministic ordering key for replay
langgraph.attempt int Retry attempt number (0-indexed)
Error Status

When event.Meta["error"] exists:

  • Span status is set to codes.Error
  • Error message is recorded as span status description
  • Error event is added to the span
Flushing Spans

Always flush spans before application shutdown to ensure delivery:

defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := emitter.Flush(ctx); err != nil {
        log.Printf("Failed to flush spans: %v", err)
    }
}()

Multi-Emitter Pattern

Combine multiple emitters to send events to different backends simultaneously:

import (
    "context"
    "errors"
    "os"

    "github.com/dshills/langgraph-go/graph/emit"
    "go.opentelemetry.io/otel"
)

// MultiEmitter wraps multiple emitters
type MultiEmitter struct {
    emitters []emit.Emitter
}

func NewMultiEmitter(emitters ...emit.Emitter) *MultiEmitter {
    return &MultiEmitter{emitters: emitters}
}

func (m *MultiEmitter) Emit(event emit.Event) {
    for _, e := range m.emitters {
        e.Emit(event)
    }
}

func (m *MultiEmitter) EmitBatch(ctx context.Context, events []emit.Event) error {
    var errs []error
    for _, e := range m.emitters {
        if err := e.EmitBatch(ctx, events); err != nil {
            errs = append(errs, err)
        }
    }
    if len(errs) > 0 {
        return errors.Join(errs...) // Preserves all errors with proper unwrapping
    }
    return nil
}

func (m *MultiEmitter) Flush(ctx context.Context) error {
    var errs []error
    for _, e := range m.emitters {
        if err := e.Flush(ctx); err != nil {
            errs = append(errs, err)
        }
    }
    if len(errs) > 0 {
        return errors.Join(errs...) // Preserves all errors with proper unwrapping
    }
    return nil
}

// Note: errors.Join requires Go 1.20+. For earlier versions, use a multierror package
// or concatenate error messages manually.

// Usage: logs + distributed tracing
func main() {
    logEmitter := emit.NewLogEmitter(os.Stdout, true)
    otelEmitter := emit.NewOTelEmitter(otel.Tracer("langgraph-go"))

    multi := NewMultiEmitter(logEmitter, otelEmitter)

    engine := graph.New[MyState](
        graph.WithEmitter(multi),
    )
}

Event Schema

Events follow a standard structure defined in event.go:

type Event struct {
    RunID  string                 // Workflow execution ID
    Step   int                    // Sequential step number
    NodeID string                 // Node that emitted the event
    Msg    string                 // Event type/message
    Meta   map[string]interface{} // Additional metadata
}
⚠️ Security: Sensitive Data Handling

Critical: Event metadata can contain sensitive information. Always sanitize before emission:

Sensitive Fields to Redact:

  • User prompts and LLM inputs (may contain PII)
  • API keys and authentication tokens
  • Personal identifiable information (names, emails, addresses)
  • System credentials and connection strings
  • Proprietary business logic or trade secrets

Best Practices:

  • Implement metadata redaction filters before emission
  • Use hash/truncate for sensitive values (e.g., hash user IDs)
  • Configure allowlist/blocklist for metadata keys
  • Encrypt event storage at rest and in transit
  • Set retention policies and compliance controls
  • Review OpenTelemetry collector configuration for sensitive attribute filtering

Example Redaction:

func sanitizeMetadata(meta map[string]interface{}) map[string]interface{} {
    sanitized := make(map[string]interface{})
    redactKeys := map[string]bool{"prompt": true, "api_key": true, "user_email": true}

    for k, v := range meta {
        if redactKeys[k] {
            sanitized[k] = "[REDACTED]"
        } else if str, ok := v.(string); ok && len(str) > 1000 {
            sanitized[k] = str[:1000] + "... [TRUNCATED]"
        } else {
            sanitized[k] = v
        }
    }
    return sanitized
}
Standard Metadata Conventions
Performance Metrics
  • duration_ms (int64): Execution duration in milliseconds
  • memory_bytes (int64): Memory usage in bytes
  • cpu_percent (float64): CPU utilization percentage
Error Context
  • error (string): Error message
  • error_type (string): Error classification
  • retryable (bool): Whether error can be retried
  • retry_attempt (int): Current retry attempt (1-indexed)
  • stack_trace (string): Stack trace for debugging
LLM-Specific
  • tokens (int): Total token count
  • input_tokens (int): Input token count
  • output_tokens (int): Output token count
  • cost (float64): Estimated cost in USD
  • model (string): Model identifier
  • temperature (float64): Temperature parameter
  • max_tokens (int): Max tokens parameter
  • finish_reason (string): Completion reason
Node Classification
  • node_type (string): Node category (e.g., "llm", "tool", "processor")
  • node_version (string): Node implementation version
Routing Decisions
  • next_node (string): Next node ID for single routing
  • next_nodes ([]string): Multiple next nodes for fan-out
  • routing_reason (string): Explanation of routing decision
  • condition (string): Condition that triggered the route
Event Helper Methods
// Add duration metadata
event := Event{RunID: "run-001", Msg: "node_end"}.
    WithDuration(250 * time.Millisecond)

// Add error metadata
event := Event{RunID: "run-001", Msg: "error"}.
    WithError(errors.New("validation failed"))

// Add node type metadata
event := Event{RunID: "run-001", NodeID: "llm-node"}.
    WithNodeType("llm")

// Chain multiple metadata fields (using canonical keys)
event := Event{RunID: "run-001", NodeID: "llm-node", Msg: "node_end"}.
    WithDuration(250 * time.Millisecond).
    WithNodeType("llm").
    WithMeta("input_tokens", 100).
    WithMeta("output_tokens", 50).
    WithMeta("cost", 0.003).
    WithMeta("model", "gpt-4")

Performance Considerations

Batching

Use EmitBatch for high-volume workflows:

// Collect events during execution
events := []emit.Event{
    {RunID: "run-001", Step: 1, NodeID: "nodeA", Msg: "node_start"},
    {RunID: "run-001", Step: 1, NodeID: "nodeA", Msg: "node_end"},
    {RunID: "run-001", Step: 2, NodeID: "nodeB", Msg: "node_start"},
}

// Emit in batch for better performance
if err := emitter.EmitBatch(ctx, events); err != nil {
    log.Printf("Batch emit failed: %v", err)
}

Benefits:

  • Reduces network round-trips
  • Amortizes serialization overhead
  • Enables backend bulk insert optimizations
  • Improves throughput for concurrent workflows
Async Emission

Emitters should not block workflow execution. For high-throughput scenarios, buffer events asynchronously:

import (
    "context"
    "log"
    "sync"

    "github.com/dshills/langgraph-go/graph/emit"
)

type AsyncEmitter struct {
    delegate emit.Emitter
    buffer   chan emit.Event
    wg       sync.WaitGroup
    closed   bool
    mu       sync.Mutex
}

func NewAsyncEmitter(delegate emit.Emitter, bufferSize int) *AsyncEmitter {
    a := &AsyncEmitter{
        delegate: delegate,
        buffer:   make(chan emit.Event, bufferSize),
    }
    a.wg.Add(1)
    go a.worker()
    return a
}

func (a *AsyncEmitter) worker() {
    defer a.wg.Done()
    for event := range a.buffer {
        a.delegate.Emit(event)
    }
}

func (a *AsyncEmitter) Emit(event emit.Event) {
    a.mu.Lock()
    defer a.mu.Unlock()

    if a.closed {
        return
    }

    // Hold mutex during send to prevent race with Flush closing the channel
    select {
    case a.buffer <- event:
        // Buffered successfully
    default:
        // Buffer full - log and drop (or block)
        log.Printf("Event buffer full, dropping event: %v", event.Msg)
    }
}

func (a *AsyncEmitter) EmitBatch(ctx context.Context, events []emit.Event) error {
    for _, event := range events {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            a.Emit(event)
        }
    }
    return nil
}

func (a *AsyncEmitter) Flush(ctx context.Context) error {
    a.mu.Lock()
    if a.closed {
        a.mu.Unlock()
        return nil
    }
    a.closed = true
    close(a.buffer)
    a.mu.Unlock()

    // Wait for worker to drain with timeout
    done := make(chan struct{})
    go func() {
        a.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return a.delegate.Flush(ctx)
    case <-ctx.Done():
        return ctx.Err()
    }
}
Sampling

For extremely high-volume workflows, emit only a percentage of events:

import (
    "context"
    "math/rand"

    "github.com/dshills/langgraph-go/graph/emit"
)

type SamplingEmitter struct {
    delegate   emit.Emitter
    sampleRate float64 // 0.0 to 1.0
}

func NewSamplingEmitter(delegate emit.Emitter, sampleRate float64) *SamplingEmitter {
    return &SamplingEmitter{
        delegate:   delegate,
        sampleRate: sampleRate,
    }
}

func (s *SamplingEmitter) Emit(event emit.Event) {
    if rand.Float64() < s.sampleRate {
        s.delegate.Emit(event)
    }
}

func (s *SamplingEmitter) EmitBatch(ctx context.Context, events []emit.Event) error {
    sampled := make([]emit.Event, 0, len(events))
    for _, event := range events {
        if rand.Float64() < s.sampleRate {
            sampled = append(sampled, event)
        }
    }
    if len(sampled) > 0 {
        return s.delegate.EmitBatch(ctx, sampled)
    }
    return nil
}

func (s *SamplingEmitter) Flush(ctx context.Context) error {
    return s.delegate.Flush(ctx)
}
OpenTelemetry Performance

OpenTelemetry spans are lightweight but not free. Performance tips:

  1. Use Batch Span Processor (default in SDK):

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter), // Batches spans automatically
    )
    
  2. Limit Attribute Size: Keep metadata small (< 1KB per span)

  3. Sample Traces: Use sampling for high-volume production:

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), // 10% sampling
    )
    
  4. Set Batch Timeouts: Control export frequency:

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter,
            sdktrace.WithBatchTimeout(5*time.Second),
            sdktrace.WithMaxExportBatchSize(512),
        ),
    )
    
Memory Usage

LogEmitter: Minimal overhead (writes directly to io.Writer)

BufferedEmitter: Stores all events in memory. Monitor with:

events := emitter.GetHistory("run-001")
fmt.Printf("Stored %d events for run-001\n", len(events))

OTelEmitter: Spans are batched in memory before export. Monitor OpenTelemetry metrics for buffer usage.

Benchmarking

Measure emitter overhead in your workflow:

func BenchmarkEmitter(b *testing.B) {
    emitter := emit.NewOTelEmitter(otel.Tracer("test"))
    event := emit.Event{
        RunID: "run-001",
        Step: 1,
        NodeID: "nodeA",
        Msg: "node_start",
        Meta: map[string]interface{}{
            "node_type": "llm",
        },
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        emitter.Emit(event)
    }
}

Examples

See the examples directory for complete working examples:

  • examples/otel-jaeger/ - Jaeger integration
  • examples/otel-console/ - Console output for development
  • examples/multi-emitter/ - Combining multiple backends

References

Documentation

Overview

Package emit provides event emission and observability for graph execution.

Package emit provides event emission and observability for graph execution.

Package emit provides event emission and observability for graph execution.

Package emit provides event emission and observability for graph execution.

Package emit provides event emission and observability for graph execution.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedEmitter

type BufferedEmitter struct {
	// contains filtered or unexported fields
}

BufferedEmitter implements Emitter by storing events in memory (T169-T172).

This emitter captures all events and provides query capabilities for. execution history analysis. Events are organized by runID for efficient. retrieval and filtering.

Features: - Thread-safe concurrent access. - Query by runID with optional filtering. - Filter by nodeID, message, step range. - Clear events by runID or all events.

Use cases: - Development and debugging. - Testing and validation. - Real-time monitoring dashboards. - Post-execution analysis.

Warning: This emitter stores all events in memory. For production. deployments with long-running workflows or high event volume, consider. using a persistent storage backend or implement event rotation/cleanup.

Example usage:

// Create buffered emitter for testing. emitter := emit.NewBufferedEmitter(). engine := graph.New(reducer, store, emitter, opts).

// Run workflow. engine.Run(ctx, "run-001", initialState).

// Query execution history. allEvents := emitter.GetHistory("run-001"). errorEvents := emitter.GetHistoryWithFilter("run-001", emit.HistoryFilter{Msg: "error"}).

// Clean up old runs. emitter.Clear("run-001").

func NewBufferedEmitter

func NewBufferedEmitter() *BufferedEmitter

NewBufferedEmitter creates a new BufferedEmitter (T169).

Returns a BufferedEmitter that stores all events in memory and provides. query capabilities. Safe for concurrent use.

func (*BufferedEmitter) Clear

func (b *BufferedEmitter) Clear(runID string)

Clear removes stored events (T170).

If runID is non-empty, clears only events for that specific run. If runID is empty, clears all stored events across all runs.

This method is thread-safe and can be called concurrently.

Example:

// Clear specific run. emitter.Clear("run-001").

// Clear all runs. emitter.Clear("").

func (*BufferedEmitter) Emit

func (b *BufferedEmitter) Emit(event Event)

Emit stores an event in the buffer (T169).

Events are organized by runID for efficient retrieval. This method is. thread-safe and can be called concurrently from multiple goroutines.

func (*BufferedEmitter) EmitBatch

func (b *BufferedEmitter) EmitBatch(_ context.Context, events []Event) error

EmitBatch stores multiple events in the buffer in a single operation.

func (*BufferedEmitter) Flush

func (b *BufferedEmitter) Flush(_ context.Context) error

Flush is a no-op for buffered emitter (events are already stored in memory).

func (*BufferedEmitter) GetHistory

func (b *BufferedEmitter) GetHistory(runID string) []Event

GetHistory retrieves all events for a specific runID (T170).

Returns events in the order they were emitted. Returns an empty slice. if no events exist for the given runID.

This method is thread-safe and returns a copy of the events to prevent. concurrent modification issues.

Example:

events := emitter.GetHistory("run-001"). for _, event := range events {. fmt.Printf("[%s] %s: %s\n", event.RunID, event.NodeID, event.Msg). }.

func (*BufferedEmitter) GetHistoryWithFilter

func (b *BufferedEmitter) GetHistoryWithFilter(runID string, filter HistoryFilter) []Event

GetHistoryWithFilter retrieves filtered events for a specific runID (T171, T172).

Applies the provided filter criteria to select matching events. All filter. conditions must match for an event to be included (AND logic).

Returns events in the order they were emitted. Returns an empty slice if. no events match the filter.

This method is thread-safe and returns a copy of the events.

Example:

// Get error events from "validator" node. filter := emit.HistoryFilter{.

		NodeID: "validator",
		Msg:    "error",
}.

errors := emitter.GetHistoryWithFilter("run-001", filter).

// Get events from steps 10-20. minStep, maxStep := 10, 20. filter := emit.HistoryFilter{.

		MinStep: &minStep,
		MaxStep: &maxStep,
}.

stepEvents := emitter.GetHistoryWithFilter("run-001", filter).

type Emitter

type Emitter interface {
	// Emit sends an observability event to the configured backend.
	//
	// Implementations should not block workflow execution.
	// If the backend is unavailable or slow, events should be:
	// - Buffered for later delivery.
	// - Dropped with error logging.
	// - Sent asynchronously.
	//
	// Emit should not panic. Errors should be logged internally.
	Emit(event Event)

	// EmitBatch sends multiple events in a single operation for improved performance.
	//
	// Batching reduces overhead when emitting high volumes of events by:
	// - Amortizing network round-trips across multiple events.
	// - Reducing serialization overhead.
	// - Enabling backend bulk insert optimizations.
	// - Improving throughput for high-concurrency workflows.
	//
	// Implementations should:
	// - Process events in order (maintain happened-before relationships).
	// - Not block workflow execution (buffer or process asynchronously).
	// - Handle partial failures gracefully (log and continue).
	// - Not panic on errors.
	//
	// Parameters:
	// - ctx: Context for cancellation and timeouts.
	// - events: Events to emit, ordered by creation time.
	//
	// Returns error only on catastrophic failures (e.g., configuration errors).
	// Individual event failures should be logged but not returned.
	//
	// Example usage:
	//
	// events := []Event{.
	//	    {RunID: "run-001", Msg: "step_start", ...},
	//	    {RunID: "run-001", Msg: "step_complete", ...},
	// }.
	// if err := emitter.EmitBatch(ctx, events); err != nil {.
	// log.Errorf("batch emit failed: %v", err).
	// }.
	EmitBatch(ctx context.Context, events []Event) error

	// Flush ensures all buffered events are sent to the backend.
	//
	// Call this method:
	// - Before application shutdown to prevent event loss.
	// - At workflow completion to ensure all events are delivered.
	// - After critical operations requiring immediate visibility.
	// - During testing to verify event emission.
	//
	// Implementations should:
	// - Block until all buffered events are sent or timeout occurs.
	// - Respect context cancellation and deadlines.
	// - Return error if events cannot be delivered.
	// - Be safe to call multiple times (idempotent).
	//
	// Parameters:
	// - ctx: Context for cancellation and timeout.
	//
	// Returns error if flush fails or times out. Implementations should attempt.
	// best-effort delivery even on error (e.g., flush partial buffers).
	//
	// Example usage:
	//
	// defer func() {.
	// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second).
	// defer cancel().
	// if err := emitter.Flush(ctx); err != nil {.
	// log.Errorf("failed to flush events on shutdown: %v", err).
	// }.
	// }().
	Flush(ctx context.Context) error
}

Emitter receives and processes observability events from workflow execution.

Emitters enable pluggable observability backends: - Logging: stdout, files, syslog. - Distributed tracing: OpenTelemetry, Jaeger, Zipkin. - Metrics: Prometheus, StatsD. - Analytics: DataDog, New Relic.

Implementations should be: - Non-blocking: Avoid slowing down workflow execution. - Thread-safe: May be called concurrently from multiple nodes. - Resilient: Handle failures gracefully (don't crash workflow).

Common patterns: - Buffering: Collect events and flush in batches. - Filtering: Only emit events matching criteria (e.g., errors only). - Multi-emit: Fan out to multiple backends. - Sampling: Emit only a percentage of events for high-volume workflows.

type Event

type Event struct {
	// RunID identifies the workflow execution that emitted this event.
	RunID string

	// Step is the sequential step number in the workflow (1-indexed).
	// Zero for workflow-level events (start, complete, error).
	Step int

	// NodeID identifies which node emitted this event.
	// Empty string for workflow-level events.
	NodeID string

	// Msg is a human-readable description of the event.
	Msg string

	// Meta contains additional structured data specific to this event (T168).
	//
	// Standard Metadata Conventions:
	//
	// Performance Metrics:
	// - "duration_ms" (int64): Execution duration in milliseconds.
	// - "memory_bytes" (int64): Memory usage in bytes.
	// - "cpu_percent" (float64): CPU utilization percentage.
	//
	// Error Context:
	// - "error" (string): Error message from error.Error().
	// - "error_type" (string): Error type or classification (e.g., "validation", "timeout").
	// - "retryable" (bool): Whether the error can be retried.
	// - "retry_attempt" (int): Current retry attempt number (1-indexed).
	// - "stack_trace" (string): Optional stack trace for debugging.
	//
	// LLM-Specific:
	// - "tokens" (int): Total token count (input + output).
	// - "input_tokens" (int): Input token count.
	// - "output_tokens" (int): Output token count.
	// - "cost" (float64): Estimated cost in USD.
	// - "model" (string): Model identifier (e.g., "gpt-4", "claude-3-opus").
	// - "temperature" (float64): Temperature parameter used.
	// - "max_tokens" (int): Max tokens parameter used.
	// - "finish_reason" (string): Completion reason (e.g., "stop", "length", "tool_calls").
	//
	// Node Classification:
	// - "node_type" (string): Node category (e.g., "llm", "tool", "processor", "validator").
	// - "node_version" (string): Node implementation version.
	//
	// Checkpoint Context:
	// - "checkpoint_id" (string): Checkpoint identifier.
	// - "checkpoint_label" (string): Human-readable checkpoint label.
	// - "state_size" (int): Serialized state size in bytes.
	//
	// Tool Execution:
	// - "tool_name" (string): Name of the tool being executed.
	// - "tool_input" (any): Tool input parameters (if serializable).
	// - "tool_output" (any): Tool output result (if serializable).
	//
	// State Changes:
	// - "delta" (any): State change applied by the node.
	// - "state_version" (int): State version number.
	//
	// Routing Decisions:
	// - "next_node" (string): Next node ID for routing decisions.
	// - "next_nodes" ([]string): Multiple next nodes for fan-out.
	// - "routing_reason" (string): Explanation of routing decision.
	// - "condition" (string): Condition that triggered the route.
	//
	// Custom Application Data:
	// Applications can add domain-specific metadata using their own keys.
	// Use namespaced keys to avoid conflicts (e.g., "app_user_id", "app_request_id").
	//
	// Usage with helper methods:
	//
	// event := Event{RunID: "run-001", NodeID: "llm-node", Msg: "node_end"}.
	//	enriched := event.
	//		WithDuration(250 * time.Millisecond).
	//		WithNodeType("llm").
	//		WithMeta("tokens", 150).
	//		WithMeta("cost", 0.003).
	// WithMeta("model", "gpt-4").
	Meta map[string]interface{}
}

Event represents an observability event emitted during workflow execution.

Events provide detailed insight into workflow behavior: - Node execution start/complete. - State changes and transitions. - Errors and warnings. - Performance metrics. - Checkpoint operations.

Events are emitted to an Emitter which can: - Log to stdout/stderr. - Send to OpenTelemetry. - Store in time-series databases. - Trigger alerts.

func (Event) WithDuration

func (e Event) WithDuration(d time.Duration) Event

WithDuration returns a copy of the event with duration_ms metadata (T166).

Sets the "duration_ms" field to the duration in milliseconds as an int64. Preserves all existing metadata fields.

Example:

event := Event{RunID: "run-001", Msg: "node_end"}. enriched := event.WithDuration(250 * time.Millisecond). // enriched.Meta["duration_ms"] == 250.

func (Event) WithError

func (e Event) WithError(err error) Event

WithError returns a copy of the event with error metadata (T166).

Sets the "error" field to the error message string. Preserves all existing metadata fields.

Example:

event := Event{RunID: "run-001", Msg: "error"}. enriched := event.WithError(errors.New("validation failed")). // enriched.Meta["error"] == "validation failed".

func (Event) WithMeta

func (e Event) WithMeta(key string, value interface{}) Event

WithMeta returns a copy of the event with an additional metadata field (T167).

Sets the specified key-value pair in the metadata map. Preserves all existing metadata fields. If the key already exists, it will be overwritten.

This method supports chaining for fluent API usage:

event := Event{RunID: "run-001"}.

enriched := event.
	WithMeta("tokens", 150).
	WithMeta("cost", 0.003).

WithMeta("model", "gpt-4").

Example:

event := Event{RunID: "run-001", Msg: "llm_call"}. enriched := event.WithMeta("tokens", 150). // enriched.Meta["tokens"] == 150.

func (Event) WithNodeType

func (e Event) WithNodeType(nodeType string) Event

WithNodeType returns a copy of the event with node_type metadata (T167).

Sets the "node_type" field to the provided node type string. Preserves all existing metadata fields.

Common node types: - "llm": LLM call nodes. - "tool": Tool execution nodes. - "processor": Data processing nodes. - "validator": Validation nodes. - "aggregator": Result aggregation nodes.

Example:

event := Event{RunID: "run-001", NodeID: "llm-node"}. enriched := event.WithNodeType("llm"). // enriched.Meta["node_type"] == "llm".

type HistoryFilter

type HistoryFilter struct {
	NodeID  string // Filter by node ID (empty = no filter)
	Msg     string // Filter by message (empty = no filter)
	MinStep *int   // Minimum step number (nil = no filter)
	MaxStep *int   // Maximum step number (nil = no filter)
}

HistoryFilter specifies criteria for filtering execution history (T171, T172).

All filter fields are optional. When multiple fields are set, they are. combined with AND logic (all conditions must match).

Fields: - NodeID: Filter by specific node. - Msg: Filter by message type (e.g., "node_start", "error"). - MinStep: Filter events with step >= MinStep (nil = no lower bound). - MaxStep: Filter events with step <= MaxStep (nil = no upper bound).

Example usage:

// Get all errors from a specific node. filter := emit.HistoryFilter{.

		NodeID: "validator",
		Msg:    "error",
}.

errors := emitter.GetHistoryWithFilter("run-001", filter).

// Get events from steps 5-10. minStep, maxStep := 5, 10. filter := emit.HistoryFilter{.

		MinStep: &minStep,
		MaxStep: &maxStep,
}.

stepEvents := emitter.GetHistoryWithFilter("run-001", filter).

type LogEmitter

type LogEmitter struct {
	// contains filtered or unexported fields
}

LogEmitter implements Emitter by writing structured log output to a writer (T161).

Supports two output modes: - Text mode (default): Human-readable format with key=value pairs. - JSON mode: Machine-readable JSON format, one event per line.

Example text output:

[node_start] runID=run-001 step=0 nodeID=nodeA.

Example JSON output:

{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}.

Usage:

// Text output to stdout. emitter := emit.NewLogEmitter(os.Stdout, false).

// JSON output to file. f, _ := os.Create("events.jsonl"). defer func() { _ = f.Close() }(). emitter := emit.NewLogEmitter(f, true).

func NewLogEmitter

func NewLogEmitter(writer io.Writer, jsonMode bool) *LogEmitter

NewLogEmitter creates a new LogEmitter (T161, T163).

Parameters: - writer: Where to write the log output (e.g., os.Stdout, file). - jsonMode: If true, emit JSON format; if false, emit text format.

Returns a LogEmitter that writes structured event data to the provided writer.

func (*LogEmitter) Emit

func (l *LogEmitter) Emit(event Event)

Emit writes an event to the configured writer (T161).

Format depends on jsonMode: - JSON mode: Writes event as single-line JSON object. - Text mode: Writes human-readable format with [msg] prefix.

Example text output:

[node_start] runID=run-001 step=0 nodeID=nodeA. [node_end] runID=run-001 step=0 nodeID=nodeA meta={"delta":{"counter":5}}.

Example JSON output:

{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}. {"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_end","meta":{"delta":{"counter":5}}}.

func (*LogEmitter) EmitBatch

func (l *LogEmitter) EmitBatch(_ context.Context, events []Event) error

EmitBatch sends multiple events in a single operation for improved performance (T107).

For LogEmitter, batching provides efficiency by: - Reducing write syscalls (one write per batch vs per event). - Better formatting when viewing multiple related events. - Maintaining chronological order within the batch.

In text mode, events are written with blank lines between them for readability. In JSON mode, events are written as JSONL (one per line) for easy parsing.

Example text output:

[node_start] runID=run-001 step=0 nodeID=nodeA. [node_end] runID=run-001 step=0 nodeID=nodeA. [node_start] runID=run-001 step=1 nodeID=nodeB.

Example JSON output:

{"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_start","meta":null}. {"runID":"run-001","step":0,"nodeID":"nodeA","msg":"node_end","meta":{"delta":{"counter":5}}}. {"runID":"run-001","step":1,"nodeID":"nodeB","msg":"node_start","meta":null}.

This implementation is more efficient than calling Emit repeatedly because: 1. It can batch multiple events into fewer write operations. 2. It can optimize formatting across the entire batch. 3. It reduces locking overhead if the writer is synchronized.

Parameters: - ctx: Context for cancellation (currently unused but reserved for future enhancements). - events: Slice of events to emit in order.

Returns error only if writing fails. Always attempts to write all events.

func (*LogEmitter) Flush

func (l *LogEmitter) Flush(_ context.Context) error

Flush ensures all buffered events are sent to the backend (T108).

For LogEmitter, this is a no-op because: - All writes go directly to the underlying io.Writer. - No internal buffering is maintained by LogEmitter. - The writer itself handles its own buffering (e.g., os.Stdout, bufio.Writer).

If you need flush control, wrap the writer with bufio.Writer and call Flush on it directly:

buf := bufio.NewWriter(os.Stdout). emitter := emit.NewLogEmitter(buf, false).

// ... emit events ...

buf.Flush() // Flush the underlying buffer. emitter.Flush(ctx) // No-op for LogEmitter.

This method is provided to satisfy the Emitter interface and enable polymorphic usage. with other emitters (e.g., OTelEmitter) that do require flushing.

Parameters: - ctx: Context for cancellation (unused, LogEmitter writes are synchronous).

Returns nil (always succeeds).

type NullEmitter

type NullEmitter struct{}

NullEmitter implements Emitter by discarding all events (T165).

This is a no-op emitter for production environments where event. logging is not desired. It implements the Emitter interface but. does nothing with emitted events.

Use cases: - Production deployments where observability overhead is unwanted. - Testing scenarios where event capture is not needed. - Disabling event emission without changing code.

Example usage:

// Disable all event logging. emitter := emit.NewNullEmitter(). engine := graph.New(reducer, store, emitter, opts).

func NewNullEmitter

func NewNullEmitter() *NullEmitter

NewNullEmitter creates a new NullEmitter (T165).

Returns a NullEmitter that discards all events without any processing. This is safe for concurrent use and has zero overhead.

func (*NullEmitter) Emit

func (n *NullEmitter) Emit(_ Event)

Emit discards the event without any processing (T165).

This method is a no-op that immediately returns. It never errors. and performs no I/O or processing.

func (*NullEmitter) EmitBatch

func (n *NullEmitter) EmitBatch(_ context.Context, _ []Event) error

EmitBatch discards multiple events in a single operation.

func (*NullEmitter) Flush

func (n *NullEmitter) Flush(_ context.Context) error

Flush is a no-op for null emitter.

type OTelEmitter

type OTelEmitter struct {
	// contains filtered or unexported fields
}

OTelEmitter implements Emitter by creating OpenTelemetry spans (T109-T111).

Each event becomes a span with:

  • Span name: event.Msg (e.g., "node_start", "node_end")
  • Attributes: runID, step, nodeID, and all event.Meta fields
  • Timestamps: Derived from span creation
  • Status: Set to error if event.Meta["error"] exists

Supports distributed tracing by:

  • Creating child spans for node execution
  • Propagating trace context across service boundaries
  • Recording performance metrics as span attributes
  • Capturing errors with stack traces

Concurrency attributes (T111):

  • step_id: Unique identifier for the execution step
  • order_key: Deterministic ordering key for replay
  • attempt: Retry attempt number (0 for first attempt)

Usage:

// Create tracer from OpenTelemetry provider
tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)

// Emit events that become spans
emitter.Emit(Event{
    RunID: "run-001",
    Step: 1,
    NodeID: "nodeA",
    Msg: "node_start",
})

Integration with OpenTelemetry:

// Setup OpenTelemetry provider (application code)
import (
    "go.opentelemetry.io/otel"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Create trace provider with exporter (Jaeger, Zipkin, etc.)
tp := sdktrace.NewTracerProvider(
    sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)

// Create emitter
tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)

// Use in engine
engine := graph.New[MyState](
    graph.WithEmitter(emitter),
)

func NewOTelEmitter

func NewOTelEmitter(tracer trace.Tracer) *OTelEmitter

NewOTelEmitter creates a new OTelEmitter (T109).

Parameters:

  • tracer: OpenTelemetry tracer from otel.Tracer("service-name")

Returns an OTelEmitter that creates spans for each event.

Example:

tracer := otel.Tracer("langgraph-go")
emitter := emit.NewOTelEmitter(tracer)

func (*OTelEmitter) Emit

func (o *OTelEmitter) Emit(event Event)

Emit creates an OpenTelemetry span for the event (T109).

The span includes:

  • Name: event.Msg (e.g., "node_start", "node_end")
  • Attributes: All event fields and metadata
  • Status: Error if event contains error metadata
  • Timestamps: Start time (now), end time (immediate for instant events)

For performance, the span is immediately ended (not left open). This is appropriate for events representing points in time rather than durations.

If the event contains a "duration_ms" metadata field, the span's end time is adjusted to reflect the actual duration.

func (*OTelEmitter) EmitBatch

func (o *OTelEmitter) EmitBatch(ctx context.Context, events []Event) error

EmitBatch creates multiple spans efficiently (T109).

Batching provides performance benefits by:

  • Amortizing tracer overhead across multiple spans
  • Enabling span processor batch optimizations
  • Reducing context switching overhead
  • Maintaining temporal locality for related events

All spans are created and ended immediately. They are recorded in the OpenTelemetry batch span processor for efficient export.

Parameters:

  • ctx: Context for cancellation and trace propagation
  • events: Events to emit as spans

Returns error if span creation fails (rare, usually indicates misconfiguration).

func (*OTelEmitter) Flush

func (o *OTelEmitter) Flush(ctx context.Context) error

Flush forces export of all pending spans (T110).

This method:

  • Calls ForceFlush on the tracer provider if available
  • Blocks until all spans are exported or timeout occurs
  • Should be called before application shutdown
  • Respects context cancellation and deadlines

OpenTelemetry typically buffers spans in a batch span processor for efficiency. Flush ensures these buffered spans are sent to the backend (Jaeger, Zipkin, etc.) before the application exits.

Usage:

defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := emitter.Flush(ctx); err != nil {
        log.Printf("failed to flush spans: %v", err)
    }
}()

Parameters:

  • ctx: Context with timeout/cancellation

Returns error if flush fails or times out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL