Documentation
¶
Overview ¶
Package middleware provides reusable instrumentation helpers for multi-agent systems.
This package offers middleware and wrapper functions that automatically instrument agent operations following OpenTelemetry semantic conventions for Agentic AI. It is designed to minimize code changes required to add observability to existing agent systems.
Components ¶
The package provides four main components:
Context Propagation: Helpers for passing workflow, task, and agent information through request context and HTTP headers.
Workflow Management: Functions for starting, completing, and failing workflows with automatic context attachment.
HTTP Middleware: Handler middleware for instrumenting agents as tasks, and client middleware for tracking inter-agent calls as handoffs.
Tool Wrappers: Generic wrappers for instrumenting tool/function invocations with timing, status, and error tracking.
Quick Start ¶
Basic usage in an agent system:
// 1. Create a store
store, _ := agentops.Open("postgres", agentops.WithDSN(dsn))
defer store.Close()
// 2. Start a workflow (in orchestrator)
ctx, workflow, _ := middleware.StartWorkflow(ctx, store, "my-workflow",
middleware.WithInitiator("user:123"),
)
defer middleware.CompleteWorkflow(ctx)
// 3. Instrument agent HTTP handlers
handler := middleware.AgentHandler(middleware.AgentHandlerConfig{
AgentID: "synthesis-agent",
AgentType: "synthesis",
Store: store,
})(yourHandler)
// 4. Use instrumented client for inter-agent calls
client := middleware.NewAgentClient(http.DefaultClient, middleware.AgentClientConfig{
FromAgentID: "orchestrator",
Store: store,
})
resp, _ := client.PostJSON(ctx, "http://synthesis:8004/extract", body, "synthesis-agent")
// 5. Wrap tool calls
results, _ := middleware.ToolCall(ctx, "web_search", func() ([]Result, error) {
return searchService.Search(query)
}, middleware.WithToolType("search"))
Context Propagation ¶
The package automatically propagates observability context through:
- Go context.Context: Workflow, task, agent, and store are attached to context
- HTTP Headers: X-AgentOps-Workflow-ID, X-AgentOps-Task-ID, etc.
This enables distributed tracing across agent boundaries:
// In agent A ctx = middleware.WithWorkflow(ctx, workflow) // Call agent B - headers automatically set client.PostJSON(ctx, agentBURL, body, "agent-b") // In agent B - workflow ID extracted from headers workflowID := r.Header.Get(middleware.HeaderWorkflowID)
HTTP Middleware ¶
AgentHandler wraps HTTP handlers to automatically create tasks:
mux := http.NewServeMux()
mux.Handle("/extract", middleware.AgentHandlerFunc(cfg, extractHandler))
mux.Handle("/analyze", middleware.AgentHandlerFunc(cfg, analyzeHandler))
Each request creates a task that captures:
- Start/end time and duration
- HTTP status code
- Success/failure status
- Automatic linking to parent workflow
AgentClient wraps http.Client to track inter-agent calls as handoffs:
client := middleware.NewAgentClient(nil, middleware.AgentClientConfig{
FromAgentID: "orchestrator",
FromAgentType: "orchestration",
Store: store,
})
// This call is recorded as a handoff
resp, err := client.PostJSON(ctx, synthesisURL, body, "synthesis-agent")
Tool Wrappers ¶
Generic tool wrapper for any function:
result, err := middleware.ToolCall(ctx, "tool_name", func() (ResultType, error) {
return myTool.Execute(args)
}, middleware.WithToolType("api"))
Convenience wrappers for common tool types:
// Search tools results, _ := middleware.SearchToolCall(ctx, "web_search", query, searchFn) // Database tools rows, _ := middleware.DatabaseToolCall(ctx, "user_query", sql, queryFn) // API tools data, _ := middleware.APIToolCall(ctx, "weather_api", "GET", url, apiFn)
Workflow Scopes ¶
For simpler workflow management, use WorkflowScope:
err := middleware.WorkflowScope(ctx, store, "my-workflow",
func(ctx context.Context, wf *agentops.Workflow) error {
// Do work...
// Workflow automatically completed on success, failed on error
return nil
},
middleware.WithInitiator("user:123"),
)
Integration with OpenTelemetry ¶
The middleware uses semantic conventions from github.com/agentplexus/omniobserve/semconv/agent, which align with OpenTelemetry's gen_ai.agent.* namespace. This enables integration with OpenTelemetry-compatible observability platforms.
Index ¶
- Constants
- func APIToolCall[T any](ctx context.Context, toolName string, method string, url string, ...) (T, error)
- func AgentHandler(cfg AgentHandlerConfig) func(http.Handler) http.Handler
- func AgentHandlerFunc(cfg AgentHandlerConfig, handler http.HandlerFunc) http.Handler
- func CompleteWorkflow(ctx context.Context, opts ...CompleteWorkflowOption) error
- func DatabaseToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), ...) (T, error)
- func FailWorkflow(ctx context.Context, err error) error
- func RetryToolCall[T any](ctx context.Context, toolName string, maxRetries int, ...) (T, error)
- func SearchToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), ...) (T, error)
- func StartWorkflow(ctx context.Context, store agentops.Store, name string, opts ...WorkflowOption) (context.Context, *agentops.Workflow, error)
- func StoreFromContext(ctx context.Context) agentops.Store
- func TaskFromContext(ctx context.Context) *agentops.Task
- func TaskIDFromContext(ctx context.Context) string
- func ToolCall[T any](ctx context.Context, toolName string, fn func() (T, error), ...) (T, error)
- func ToolCallVoid(ctx context.Context, toolName string, fn func() error, opts ...ToolCallOption) error
- func WithAgent(ctx context.Context, agent AgentInfo) context.Context
- func WithStore(ctx context.Context, store agentops.Store) context.Context
- func WithTask(ctx context.Context, task *agentops.Task) context.Context
- func WithWorkflow(ctx context.Context, workflow *agentops.Workflow) context.Context
- func WorkflowFromContext(ctx context.Context) *agentops.Workflow
- func WorkflowIDFromContext(ctx context.Context) string
- func WorkflowScope(ctx context.Context, store agentops.Store, name string, ...) error
- type AgentClient
- func (c *AgentClient) Do(ctx context.Context, req *http.Request, toAgentID string) (*http.Response, error)
- func (c *AgentClient) Get(ctx context.Context, url string, toAgentID string) (*http.Response, error)
- func (c *AgentClient) Post(ctx context.Context, url string, contentType string, body io.Reader, ...) (*http.Response, error)
- func (c *AgentClient) PostJSON(ctx context.Context, url string, body io.Reader, toAgentID string) (*http.Response, error)
- type AgentClientConfig
- type AgentHandlerConfig
- type AgentInfo
- type CompleteWorkflowConfig
- type CompleteWorkflowOption
- type HTTPToolResponse
- type ToolCallConfig
- type ToolCallOption
- type ToolCallResult
- type WorkflowConfig
- type WorkflowOption
Constants ¶
const ( HeaderWorkflowID = "X-AgentOps-Workflow-ID" HeaderTaskID = "X-AgentOps-Task-ID" HeaderAgentID = "X-AgentOps-Agent-ID" HeaderTraceID = "X-AgentOps-Trace-ID" )
PropagationHeaders are HTTP headers used to propagate context across services.
Variables ¶
This section is empty.
Functions ¶
func APIToolCall ¶
func APIToolCall[T any](ctx context.Context, toolName string, method string, url string, fn func() (T, error), opts ...ToolCallOption) (T, error)
APIToolCall is a convenience wrapper for external API tool calls.
Usage:
data, err := APIToolCall(ctx, "weather_api", "GET", "https://api.weather.com/current", func() (WeatherData, error) {
return weatherClient.GetCurrent(location)
})
func AgentHandler ¶
func AgentHandler(cfg AgentHandlerConfig) func(http.Handler) http.Handler
AgentHandler returns HTTP middleware that instruments handler as an agent task. It automatically:
- Creates a task when a request arrives
- Extracts workflow ID from headers if present
- Records duration, status, and errors
- Completes or fails the task based on response status
func AgentHandlerFunc ¶
func AgentHandlerFunc(cfg AgentHandlerConfig, handler http.HandlerFunc) http.Handler
AgentHandlerFunc is a convenience wrapper for AgentHandler with http.HandlerFunc.
func CompleteWorkflow ¶
func CompleteWorkflow(ctx context.Context, opts ...CompleteWorkflowOption) error
CompleteWorkflow marks the workflow in context as completed.
func DatabaseToolCall ¶
func DatabaseToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), opts ...ToolCallOption) (T, error)
DatabaseToolCall is a convenience wrapper for database tool calls.
Usage:
rows, err := DatabaseToolCall(ctx, "user_lookup", "SELECT * FROM users WHERE id = ?", func() ([]User, error) {
return db.Query(query, userID)
})
func FailWorkflow ¶
FailWorkflow marks the workflow in context as failed.
func RetryToolCall ¶
func RetryToolCall[T any](ctx context.Context, toolName string, maxRetries int, fn func(attempt int) (T, error), opts ...ToolCallOption) (T, error)
RetryToolCall wraps a tool call with automatic retry tracking. It increments the retry count on each attempt.
Usage:
result, err := RetryToolCall(ctx, "flaky_api", 3, func(attempt int) (Data, error) {
return flakyAPI.Call()
})
func SearchToolCall ¶
func SearchToolCall[T any](ctx context.Context, toolName string, query string, fn func() (T, error), opts ...ToolCallOption) (T, error)
SearchToolCall is a convenience wrapper for search tool calls.
Usage:
results, err := SearchToolCall(ctx, "web_search", query, func() ([]SearchResult, error) {
return searchService.Search(query)
})
func StartWorkflow ¶
func StartWorkflow(ctx context.Context, store agentops.Store, name string, opts ...WorkflowOption) (context.Context, *agentops.Workflow, error)
StartWorkflow creates a new workflow and returns a context with the workflow attached. The returned context should be used for all subsequent operations within the workflow.
func StoreFromContext ¶
StoreFromContext retrieves the store from context.
func TaskFromContext ¶
TaskFromContext retrieves the task from context.
func TaskIDFromContext ¶
TaskIDFromContext retrieves just the task ID from context.
func ToolCall ¶
func ToolCall[T any](ctx context.Context, toolName string, fn func() (T, error), opts ...ToolCallOption) (T, error)
ToolCall wraps a function as an instrumented tool call. It automatically records the tool invocation with timing, status, and errors.
Usage:
result, err := ToolCall(ctx, "web_search", func() (any, error) {
return searchService.Search(query)
}, WithToolType("search"), WithToolInput(map[string]any{"query": query}))
func ToolCallVoid ¶
func ToolCallVoid(ctx context.Context, toolName string, fn func() error, opts ...ToolCallOption) error
ToolCallVoid wraps a void function as an instrumented tool call. Use this when the tool doesn't return a value.
Usage:
err := ToolCallVoid(ctx, "send_notification", func() error {
return notificationService.Send(message)
}, WithToolType("notification"))
func WithWorkflow ¶
WithWorkflow adds a workflow to the context.
func WorkflowFromContext ¶
WorkflowFromContext retrieves the workflow from context.
func WorkflowIDFromContext ¶
WorkflowIDFromContext retrieves just the workflow ID from context.
func WorkflowScope ¶
func WorkflowScope(ctx context.Context, store agentops.Store, name string, fn func(context.Context, *agentops.Workflow) error, opts ...WorkflowOption) error
WorkflowScope provides a convenient way to manage workflow lifecycle. It automatically completes or fails the workflow based on the returned error.
Usage:
err := WorkflowScope(ctx, store, "my-workflow", func(ctx context.Context, wf *agentops.Workflow) error {
// Do work...
return nil
})
Types ¶
type AgentClient ¶
type AgentClient struct {
// contains filtered or unexported fields
}
AgentClient wraps an http.Client to track inter-agent calls as handoffs.
func NewAgentClient ¶
func NewAgentClient(client *http.Client, cfg AgentClientConfig) *AgentClient
NewAgentClient creates a new AgentClient that tracks handoffs.
func (*AgentClient) Do ¶
func (c *AgentClient) Do(ctx context.Context, req *http.Request, toAgentID string) (*http.Response, error)
Do executes an HTTP request and records it as a handoff to another agent.
func (*AgentClient) Get ¶
func (c *AgentClient) Get(ctx context.Context, url string, toAgentID string) (*http.Response, error)
Get performs a GET request to another agent.
type AgentClientConfig ¶
type AgentClientConfig struct {
// FromAgentID is the ID of the agent making the call.
FromAgentID string
// FromAgentType is the type of the agent making the call.
FromAgentType string
// Store is the agentops store. If nil, attempts to get from context.
Store agentops.Store
}
AgentClientConfig configures the agent HTTP client for tracking handoffs.
type AgentHandlerConfig ¶
type AgentHandlerConfig struct {
// AgentID is the unique identifier of the agent.
AgentID string
// AgentType categorizes the agent's role (e.g., "synthesis", "research").
AgentType string
// AgentName is the human-readable name of the agent.
AgentName string
// DefaultTaskType is the default task type if not specified in the request.
DefaultTaskType string
// TaskNameFromPath uses the URL path as the task name.
TaskNameFromPath bool
// Store is the agentops store. If nil, attempts to get from context.
Store agentops.Store
}
AgentHandlerConfig configures the agent HTTP handler middleware.
type AgentInfo ¶
AgentInfo holds agent identification for context propagation.
func AgentFromContext ¶
AgentFromContext retrieves agent info from context.
type CompleteWorkflowConfig ¶
CompleteWorkflowConfig configures workflow completion.
type CompleteWorkflowOption ¶
type CompleteWorkflowOption func(*CompleteWorkflowConfig)
CompleteWorkflowOption configures workflow completion.
func WithWorkflowOutput ¶
func WithWorkflowOutput(output map[string]any) CompleteWorkflowOption
WithWorkflowOutput sets the workflow output data.
type HTTPToolResponse ¶
type HTTPToolResponse struct {
StatusCode int
Body []byte
Headers map[string][]string
ResponseSize int
}
HTTPToolResponse represents the response from an HTTP tool call.
func HTTPToolCall ¶
func HTTPToolCall(ctx context.Context, toolName string, method string, url string, fn func() (HTTPToolResponse, error), opts ...ToolCallOption) (HTTPToolResponse, error)
HTTPToolCall wraps an HTTP-based tool call with automatic instrumentation. It records HTTP method, URL, status code, and response size.
Usage:
resp, err := HTTPToolCall(ctx, "external_api", func() (*http.Response, error) {
return http.Get("https://api.example.com/data")
})
type ToolCallConfig ¶
type ToolCallConfig struct {
// ToolType categorizes the tool (e.g., "search", "database", "api").
ToolType string
// Input is the input data for the tool call.
Input map[string]any
// HTTPMethod is the HTTP method if this is an HTTP-based tool.
HTTPMethod string
// HTTPURL is the URL if this is an HTTP-based tool.
HTTPURL string
// Store is the agentops store. If nil, attempts to get from context.
Store agentops.Store
}
ToolCallConfig configures a tool call invocation.
type ToolCallOption ¶
type ToolCallOption func(*ToolCallConfig)
ToolCallOption configures a tool call.
func WithToolHTTP ¶
func WithToolHTTP(method, url string) ToolCallOption
WithToolHTTP sets HTTP method and URL for HTTP-based tools.
func WithToolInput ¶
func WithToolInput(input map[string]any) ToolCallOption
WithToolInput sets the tool input data.
func WithToolStore ¶
func WithToolStore(store agentops.Store) ToolCallOption
WithToolStore sets the store for the tool call.
func WithToolType ¶
func WithToolType(toolType string) ToolCallOption
WithToolType sets the tool type.
type ToolCallResult ¶
type ToolCallResult struct {
Output map[string]any
HTTPStatusCode int
ResponseSize int
Error error
}
ToolCallResult holds the result of a tool call for recording.
type WorkflowConfig ¶
type WorkflowConfig struct {
// Initiator identifies what started the workflow (e.g., "user:123", "api_key:abc").
Initiator string
// ParentWorkflowID links to a parent workflow for nested workflows.
ParentWorkflowID string
// Input is the initial input data for the workflow.
Input map[string]any
// Metadata is additional metadata for the workflow.
Metadata map[string]any
// TraceID is an optional trace ID for distributed tracing correlation.
TraceID string
}
WorkflowConfig configures workflow creation.
type WorkflowOption ¶
type WorkflowOption func(*WorkflowConfig)
WorkflowOption configures workflow creation.
func WithInitiator ¶
func WithInitiator(initiator string) WorkflowOption
WithInitiator sets the workflow initiator.
func WithParentWorkflow ¶
func WithParentWorkflow(parentID string) WorkflowOption
WithParentWorkflow sets the parent workflow ID.
func WithTraceID ¶
func WithTraceID(traceID string) WorkflowOption
WithTraceID sets the trace ID for distributed tracing.
func WithWorkflowInput ¶
func WithWorkflowInput(input map[string]any) WorkflowOption
WithWorkflowInput sets the workflow input data.
func WithWorkflowMetadata ¶
func WithWorkflowMetadata(metadata map[string]any) WorkflowOption
WithWorkflowMetadata sets the workflow metadata.