middleware

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package middleware provides reusable instrumentation helpers for multi-agent systems.

This package offers middleware and wrapper functions that automatically instrument agent operations following OpenTelemetry semantic conventions for Agentic AI. It is designed to minimize code changes required to add observability to existing agent systems.

Components

The package provides four main components:

  • Context Propagation: Helpers for passing workflow, task, and agent information through request context and HTTP headers.

  • Workflow Management: Functions for starting, completing, and failing workflows with automatic context attachment.

  • HTTP Middleware: Handler middleware for instrumenting agents as tasks, and client middleware for tracking inter-agent calls as handoffs.

  • Tool Wrappers: Generic wrappers for instrumenting tool/function invocations with timing, status, and error tracking.

Quick Start

Basic usage in an agent system:

// 1. Create a store
store, _ := agentops.Open("postgres", agentops.WithDSN(dsn))
defer store.Close()

// 2. Start a workflow (in orchestrator)
ctx, workflow, _ := middleware.StartWorkflow(ctx, store, "my-workflow",
    middleware.WithInitiator("user:123"),
)
defer middleware.CompleteWorkflow(ctx)

// 3. Instrument agent HTTP handlers
handler := middleware.AgentHandler(middleware.AgentHandlerConfig{
    AgentID:   "synthesis-agent",
    AgentType: "synthesis",
    Store:     store,
})(yourHandler)

// 4. Use instrumented client for inter-agent calls
client := middleware.NewAgentClient(http.DefaultClient, middleware.AgentClientConfig{
    FromAgentID: "orchestrator",
    Store:       store,
})
resp, _ := client.PostJSON(ctx, "http://synthesis:8004/extract", body, "synthesis-agent")

// 5. Wrap tool calls
results, _ := middleware.ToolCall(ctx, "web_search", func() ([]Result, error) {
    return searchService.Search(query)
}, middleware.WithToolType("search"))

Context Propagation

The package automatically propagates observability context through:

  • Go context.Context: Workflow, task, agent, and store are attached to context
  • HTTP Headers: X-AgentOps-Workflow-ID, X-AgentOps-Task-ID, etc.

This enables distributed tracing across agent boundaries:

// In agent A
ctx = middleware.WithWorkflow(ctx, workflow)

// Call agent B - headers automatically set
client.PostJSON(ctx, agentBURL, body, "agent-b")

// In agent B - workflow ID extracted from headers
workflowID := r.Header.Get(middleware.HeaderWorkflowID)

HTTP Middleware

AgentHandler wraps HTTP handlers to automatically create tasks:

mux := http.NewServeMux()
mux.Handle("/extract", middleware.AgentHandlerFunc(cfg, extractHandler))
mux.Handle("/analyze", middleware.AgentHandlerFunc(cfg, analyzeHandler))

Each request creates a task that captures:

  • Start/end time and duration
  • HTTP status code
  • Success/failure status
  • Automatic linking to parent workflow

AgentClient wraps http.Client to track inter-agent calls as handoffs:

client := middleware.NewAgentClient(nil, middleware.AgentClientConfig{
    FromAgentID:   "orchestrator",
    FromAgentType: "orchestration",
    Store:         store,
})

// This call is recorded as a handoff
resp, err := client.PostJSON(ctx, synthesisURL, body, "synthesis-agent")

Tool Wrappers

Generic tool wrapper for any function:

result, err := middleware.ToolCall(ctx, "tool_name", func() (ResultType, error) {
    return myTool.Execute(args)
}, middleware.WithToolType("api"))

Convenience wrappers for common tool types:

// Search tools
results, _ := middleware.SearchToolCall(ctx, "web_search", query, searchFn)

// Database tools
rows, _ := middleware.DatabaseToolCall(ctx, "user_query", sql, queryFn)

// API tools
data, _ := middleware.APIToolCall(ctx, "weather_api", "GET", url, apiFn)

Workflow Scopes

For simpler workflow management, use WorkflowScope:

err := middleware.WorkflowScope(ctx, store, "my-workflow",
    func(ctx context.Context, wf *agentops.Workflow) error {
        // Do work...
        // Workflow automatically completed on success, failed on error
        return nil
    },
    middleware.WithInitiator("user:123"),
)

Integration with OpenTelemetry

The middleware uses semantic conventions from github.com/agentplexus/omniobserve/semconv/agent, which align with OpenTelemetry's gen_ai.agent.* namespace. This enables integration with OpenTelemetry-compatible observability platforms.

Index

Constants

View Source
const (
	HeaderWorkflowID = "X-AgentOps-Workflow-ID"
	HeaderTaskID     = "X-AgentOps-Task-ID"
	HeaderAgentID    = "X-AgentOps-Agent-ID"
	HeaderTraceID    = "X-AgentOps-Trace-ID"
)

PropagationHeaders are HTTP headers used to propagate context across services.

Variables

This section is empty.

Functions

func APIToolCall

func APIToolCall[T any](ctx context.Context, toolName string, method string, url string, fn func() (T, error), opts ...ToolCallOption) (T, error)

APIToolCall is a convenience wrapper for external API tool calls.

Usage:

data, err := APIToolCall(ctx, "weather_api", "GET", "https://api.weather.com/current", func() (WeatherData, error) {
    return weatherClient.GetCurrent(location)
})

func AgentHandler

func AgentHandler(cfg AgentHandlerConfig) func(http.Handler) http.Handler

AgentHandler returns HTTP middleware that instruments handler as an agent task. It automatically:

  • Creates a task when a request arrives
  • Extracts workflow ID from headers if present
  • Records duration, status, and errors
  • Completes or fails the task based on response status

func AgentHandlerFunc

func AgentHandlerFunc(cfg AgentHandlerConfig, handler http.HandlerFunc) http.Handler

AgentHandlerFunc is a convenience wrapper for AgentHandler with http.HandlerFunc.

func CompleteWorkflow

func CompleteWorkflow(ctx context.Context, opts ...CompleteWorkflowOption) error

CompleteWorkflow marks the workflow in context as completed.

func DatabaseToolCall

func DatabaseToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), opts ...ToolCallOption) (T, error)

DatabaseToolCall is a convenience wrapper for database tool calls.

Usage:

rows, err := DatabaseToolCall(ctx, "user_lookup", "SELECT * FROM users WHERE id = ?", func() ([]User, error) {
    return db.Query(query, userID)
})

func FailWorkflow

func FailWorkflow(ctx context.Context, err error) error

FailWorkflow marks the workflow in context as failed.

func RetryToolCall

func RetryToolCall[T any](ctx context.Context, toolName string, maxRetries int, fn func(attempt int) (T, error), opts ...ToolCallOption) (T, error)

RetryToolCall wraps a tool call with automatic retry tracking. It increments the retry count on each attempt.

Usage:

result, err := RetryToolCall(ctx, "flaky_api", 3, func(attempt int) (Data, error) {
    return flakyAPI.Call()
})

func SearchToolCall

func SearchToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), opts ...ToolCallOption) (T, error)

SearchToolCall is a convenience wrapper for search tool calls.

Usage:

results, err := SearchToolCall(ctx, "web_search", query, func() ([]SearchResult, error) {
    return searchService.Search(query)
})

func StartWorkflow

func StartWorkflow(ctx context.Context, store agentops.Store, name string, opts ...WorkflowOption) (context.Context, *agentops.Workflow, error)

StartWorkflow creates a new workflow and returns a context with the workflow attached. The returned context should be used for all subsequent operations within the workflow.

func StoreFromContext

func StoreFromContext(ctx context.Context) agentops.Store

StoreFromContext retrieves the store from context.

func TaskFromContext

func TaskFromContext(ctx context.Context) *agentops.Task

TaskFromContext retrieves the task from context.

func TaskIDFromContext

func TaskIDFromContext(ctx context.Context) string

TaskIDFromContext retrieves just the task ID from context.

func ToolCall

func ToolCall[T any](ctx context.Context, toolName string, fn func() (T, error), opts ...ToolCallOption) (T, error)

ToolCall wraps a function as an instrumented tool call. It automatically records the tool invocation with timing, status, and errors.

Usage:

result, err := ToolCall(ctx, "web_search", func() (any, error) {
    return searchService.Search(query)
}, WithToolType("search"), WithToolInput(map[string]any{"query": query}))

func ToolCallVoid

func ToolCallVoid(ctx context.Context, toolName string, fn func() error, opts ...ToolCallOption) error

ToolCallVoid wraps a void function as an instrumented tool call. Use this when the tool doesn't return a value.

Usage:

err := ToolCallVoid(ctx, "send_notification", func() error {
    return notificationService.Send(message)
}, WithToolType("notification"))

func WithAgent

func WithAgent(ctx context.Context, agent AgentInfo) context.Context

WithAgent adds agent info to the context.

func WithStore

func WithStore(ctx context.Context, store agentops.Store) context.Context

WithStore adds the store to the context.

func WithTask

func WithTask(ctx context.Context, task *agentops.Task) context.Context

WithTask adds a task to the context.

func WithWorkflow

func WithWorkflow(ctx context.Context, workflow *agentops.Workflow) context.Context

WithWorkflow adds a workflow to the context.

func WorkflowFromContext

func WorkflowFromContext(ctx context.Context) *agentops.Workflow

WorkflowFromContext retrieves the workflow from context.

func WorkflowIDFromContext

func WorkflowIDFromContext(ctx context.Context) string

WorkflowIDFromContext retrieves just the workflow ID from context.

func WorkflowScope

func WorkflowScope(ctx context.Context, store agentops.Store, name string, fn func(context.Context, *agentops.Workflow) error, opts ...WorkflowOption) error

WorkflowScope provides a convenient way to manage workflow lifecycle. It automatically completes or fails the workflow based on the returned error.

Usage:

err := WorkflowScope(ctx, store, "my-workflow", func(ctx context.Context, wf *agentops.Workflow) error {
    // Do work...
    return nil
})

Types

type AgentClient

type AgentClient struct {
	// contains filtered or unexported fields
}

AgentClient wraps an http.Client to track inter-agent calls as handoffs.

func NewAgentClient

func NewAgentClient(client *http.Client, cfg AgentClientConfig) *AgentClient

NewAgentClient creates a new AgentClient that tracks handoffs.

func (*AgentClient) Do

func (c *AgentClient) Do(ctx context.Context, req *http.Request, toAgentID string) (*http.Response, error)

Do executes an HTTP request and records it as a handoff to another agent.

func (*AgentClient) Get

func (c *AgentClient) Get(ctx context.Context, url string, toAgentID string) (*http.Response, error)

Get performs a GET request to another agent.

func (*AgentClient) Post

func (c *AgentClient) Post(ctx context.Context, url string, contentType string, body io.Reader, toAgentID string) (*http.Response, error)

Post performs a POST request to another agent.

func (*AgentClient) PostJSON

func (c *AgentClient) PostJSON(ctx context.Context, url string, body io.Reader, toAgentID string) (*http.Response, error)

PostJSON performs a POST request with JSON content type to another agent.

type AgentClientConfig

type AgentClientConfig struct {
	// FromAgentID is the ID of the agent making the call.
	FromAgentID string

	// FromAgentType is the type of the agent making the call.
	FromAgentType string

	// Store is the agentops store. If nil, attempts to get from context.
	Store agentops.Store
}

AgentClientConfig configures the agent HTTP client for tracking handoffs.

type AgentHandlerConfig

type AgentHandlerConfig struct {
	// AgentID is the unique identifier of the agent.
	AgentID string

	// AgentType categorizes the agent's role (e.g., "synthesis", "research").
	AgentType string

	// AgentName is the human-readable name of the agent.
	AgentName string

	// DefaultTaskType is the default task type if not specified in the request.
	DefaultTaskType string

	// TaskNameFromPath uses the URL path as the task name.
	TaskNameFromPath bool

	// Store is the agentops store. If nil, attempts to get from context.
	Store agentops.Store
}

AgentHandlerConfig configures the agent HTTP handler middleware.

type AgentInfo

type AgentInfo struct {
	ID   string
	Type string
	Name string
}

AgentInfo holds agent identification for context propagation.

func AgentFromContext

func AgentFromContext(ctx context.Context) AgentInfo

AgentFromContext retrieves agent info from context.

type CompleteWorkflowConfig

type CompleteWorkflowConfig struct {
	Output   map[string]any
	Metadata map[string]any
}

CompleteWorkflowConfig configures workflow completion.

type CompleteWorkflowOption

type CompleteWorkflowOption func(*CompleteWorkflowConfig)

CompleteWorkflowOption configures workflow completion.

func WithWorkflowOutput

func WithWorkflowOutput(output map[string]any) CompleteWorkflowOption

WithWorkflowOutput sets the workflow output data.

type HTTPToolResponse

type HTTPToolResponse struct {
	StatusCode   int
	Body         []byte
	Headers      map[string][]string
	ResponseSize int
}

HTTPToolResponse represents the response from an HTTP tool call.

func HTTPToolCall

func HTTPToolCall(ctx context.Context, toolName string, method string, url string, fn func() (HTTPToolResponse, error), opts ...ToolCallOption) (HTTPToolResponse, error)

HTTPToolCall wraps an HTTP-based tool call with automatic instrumentation. It records HTTP method, URL, status code, and response size.

Usage:

resp, err := HTTPToolCall(ctx, "external_api", func() (*http.Response, error) {
    return http.Get("https://api.example.com/data")
})

type ToolCallConfig

type ToolCallConfig struct {
	// ToolType categorizes the tool (e.g., "search", "database", "api").
	ToolType string

	// Input is the input data for the tool call.
	Input map[string]any

	// HTTPMethod is the HTTP method if this is an HTTP-based tool.
	HTTPMethod string

	// HTTPURL is the URL if this is an HTTP-based tool.
	HTTPURL string

	// Store is the agentops store. If nil, attempts to get from context.
	Store agentops.Store
}

ToolCallConfig configures a tool call invocation.

type ToolCallOption

type ToolCallOption func(*ToolCallConfig)

ToolCallOption configures a tool call.

func WithToolHTTP

func WithToolHTTP(method, url string) ToolCallOption

WithToolHTTP sets HTTP method and URL for HTTP-based tools.

func WithToolInput

func WithToolInput(input map[string]any) ToolCallOption

WithToolInput sets the tool input data.

func WithToolStore

func WithToolStore(store agentops.Store) ToolCallOption

WithToolStore sets the store for the tool call.

func WithToolType

func WithToolType(toolType string) ToolCallOption

WithToolType sets the tool type.

type ToolCallResult

type ToolCallResult struct {
	Output         map[string]any
	HTTPStatusCode int
	ResponseSize   int
	Error          error
}

ToolCallResult holds the result of a tool call for recording.

type WorkflowConfig

type WorkflowConfig struct {
	// Initiator identifies what started the workflow (e.g., "user:123", "api_key:abc").
	Initiator string

	// ParentWorkflowID links to a parent workflow for nested workflows.
	ParentWorkflowID string

	// Input is the initial input data for the workflow.
	Input map[string]any

	// Metadata is additional metadata for the workflow.
	Metadata map[string]any

	// TraceID is an optional trace ID for distributed tracing correlation.
	TraceID string
}

WorkflowConfig configures workflow creation.

type WorkflowOption

type WorkflowOption func(*WorkflowConfig)

WorkflowOption configures workflow creation.

func WithInitiator

func WithInitiator(initiator string) WorkflowOption

WithInitiator sets the workflow initiator.

func WithParentWorkflow

func WithParentWorkflow(parentID string) WorkflowOption

WithParentWorkflow sets the parent workflow ID.

func WithTraceID

func WithTraceID(traceID string) WorkflowOption

WithTraceID sets the trace ID for distributed tracing.

func WithWorkflowInput

func WithWorkflowInput(input map[string]any) WorkflowOption

WithWorkflowInput sets the workflow input data.

func WithWorkflowMetadata

func WithWorkflowMetadata(metadata map[string]any) WorkflowOption

WithWorkflowMetadata sets the workflow metadata.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL