Documentation
¶
Index ¶
- Variables
- type ExecutionConfig
- type ExecutionContext
- func (ctx *ExecutionContext) AddPendingToolCall(toolCall types.MessageToolCall)
- func (ctx *ExecutionContext) ClearPendingToolCalls()
- func (ctx *ExecutionContext) EmitStreamChunk(chunk providers.StreamChunk) bool
- func (ctx *ExecutionContext) GetPendingToolCall(id string) *types.MessageToolCall
- func (ctx *ExecutionContext) HasPendingToolCalls() bool
- func (ctx *ExecutionContext) InterruptStream(reason string)
- func (ctx *ExecutionContext) IsStreaming() bool
- func (ctx *ExecutionContext) RecordLLMCall(disableTrace bool, response *Response, startTime time.Time, ...)
- func (ctx *ExecutionContext) RemovePendingToolCall(id string) bool
- type ExecutionOptions
- type ExecutionResult
- type ExecutionTrace
- type LLMCall
- type Middleware
- type MiddlewareConfig
- type Pipeline
- func (p *Pipeline) Execute(ctx context.Context, role, content string) (*ExecutionResult, error)
- func (p *Pipeline) ExecuteStream(ctx context.Context, role string, content string) (<-chan providers.StreamChunk, error)
- func (p *Pipeline) ExecuteStreamWithMessage(ctx context.Context, message types.Message) (<-chan providers.StreamChunk, error)
- func (p *Pipeline) ExecuteWithMessage(ctx context.Context, message types.Message) (*ExecutionResult, error)
- func (p *Pipeline) ExecuteWithOptions(opts *ExecutionOptions, role, content string) (*ExecutionResult, error)
- func (p *Pipeline) Shutdown(ctx context.Context) error
- type PipelineConfig
- type PipelineRuntimeConfig
- type ProviderMiddlewareConfig
- type Response
- type ResponseMetadata
- type RetryPolicy
- type StateStoreConfig
- type TemplateMiddlewareConfig
- type ToolPolicy
- type TraceEvent
- type ValidationError
- type ValidatorMiddlewareConfig
Constants ¶
This section is empty.
Variables ¶
var (
ErrPipelineShuttingDown = errors.New("pipeline is shutting down")
)
Error constants
Functions ¶
This section is empty.
Types ¶
type ExecutionConfig ¶
type ExecutionConfig struct {
Provider providers.Provider
ToolRegistry *tools.Registry
Temperature float32
MaxTokens int
Seed *int
ToolPolicy *ToolPolicy
}
ExecutionConfig contains configuration for pipeline execution.
type ExecutionContext ¶
type ExecutionContext struct {
// Context for cancellation, deadlines, and request-scoped values
Context context.Context
// Identifiers for organization and tracking
RunID string // Test run or execution batch identifier
SessionID string // Session identifier for grouping related conversations
ConversationID string // Conversation identifier for this specific exchange
// State (mutable by middleware)
SystemPrompt string // Populated by PromptAssemblyMiddleware
Variables map[string]string // Populated by PromptAssemblyMiddleware and ContextExtractionMiddleware
AllowedTools []string // Populated by PromptAssemblyMiddleware
Messages []types.Message // Conversation history + current messages
Tools []types.ToolDef // Available tool definitions
ToolResults []types.MessageToolResult // Executed tool results
PendingToolCalls []types.MessageToolCall // Tool calls awaiting external completion (human-in-the-loop)
Prompt string // Assembled prompt (after variable substitution by TemplateMiddleware)
// Output (populated by middleware)
Trace ExecutionTrace // Complete trace of all LLM calls and events
Response *Response // Convenience pointer to the most recent response (= Trace.LLMCalls[len-1].Response)
RawResponse interface{} // Convenience pointer to most recent raw response (= Trace.LLMCalls[len-1].RawResponse)
// Error tracking (middleware can check this to see if an error occurred earlier in the chain)
Error error // First error encountered during execution (subsequent middleware still run)
// Metadata (for passing data between middleware)
Metadata map[string]interface{}
// Cost tracking (aggregate across all calls)
CostInfo types.CostInfo
// Streaming support
StreamMode bool // If true, use streaming execution
StreamOutput chan providers.StreamChunk // Output channel for streaming chunks
StreamInterrupted bool // Set to true by middleware to stop streaming
InterruptReason string // Reason for interruption
// Middleware control
ShortCircuit bool // Set to true by middleware to intentionally skip remaining middleware
// contains filtered or unexported fields
}
ExecutionContext is the execution state passed through the middleware chain. It contains all the data needed for pipeline execution and is modified by middleware.
func (*ExecutionContext) AddPendingToolCall ¶
func (ctx *ExecutionContext) AddPendingToolCall(toolCall types.MessageToolCall)
AddPendingToolCall adds a tool call to the pending list. Used by middleware when a tool returns ToolStatusPending.
func (*ExecutionContext) ClearPendingToolCalls ¶
func (ctx *ExecutionContext) ClearPendingToolCalls()
ClearPendingToolCalls removes all pending tool calls.
func (*ExecutionContext) EmitStreamChunk ¶
func (ctx *ExecutionContext) EmitStreamChunk(chunk providers.StreamChunk) bool
EmitStreamChunk emits a stream chunk to the output channel. Returns false if the stream has been interrupted or the channel is closed. Middleware that produces chunks should check the return value to know when to stop.
func (*ExecutionContext) GetPendingToolCall ¶
func (ctx *ExecutionContext) GetPendingToolCall(id string) *types.MessageToolCall
GetPendingToolCall retrieves a pending tool call by ID. Returns nil if not found.
func (*ExecutionContext) HasPendingToolCalls ¶
func (ctx *ExecutionContext) HasPendingToolCalls() bool
HasPendingToolCalls returns true if there are any pending tool calls.
func (*ExecutionContext) InterruptStream ¶
func (ctx *ExecutionContext) InterruptStream(reason string)
InterruptStream interrupts the stream with the given reason. Middleware should call this to stop streaming when validation fails, rate limits are hit, etc.
func (*ExecutionContext) IsStreaming ¶
func (ctx *ExecutionContext) IsStreaming() bool
IsStreaming returns true if the execution context is in streaming mode.
func (*ExecutionContext) RecordLLMCall ¶ added in v1.1.0
func (ctx *ExecutionContext) RecordLLMCall(disableTrace bool, response *Response, startTime time.Time, duration time.Duration, costInfo *types.CostInfo, toolCalls []types.MessageToolCall)
RecordLLMCall adds an LLM call to the execution trace if tracing is enabled. This method is called by provider middleware to track all LLM interactions.
func (*ExecutionContext) RemovePendingToolCall ¶
func (ctx *ExecutionContext) RemovePendingToolCall(id string) bool
RemovePendingToolCall removes a tool call from the pending list by ID. Returns true if the tool call was found and removed.
type ExecutionOptions ¶ added in v1.1.2
type ExecutionOptions struct {
// RunID identifies the test run or execution batch
RunID string
// SessionID identifies the session for grouping related conversations
SessionID string
// ConversationID identifies this specific conversation exchange
ConversationID string
// Context for cancellation and timeouts (if nil, uses context.Background())
Context context.Context
}
ExecutionOptions provides optional parameters for pipeline execution. All fields are optional and will use sensible defaults if not provided.
type ExecutionResult ¶
type ExecutionResult struct {
Messages []types.Message `json:"messages"` // All messages including history and responses
Response *Response `json:"response"` // The final response (convenience field)
Trace ExecutionTrace `json:"trace"` // Complete execution trace with all LLM calls
CostInfo types.CostInfo `json:"cost_info"` // Aggregate cost across all LLM calls
Metadata map[string]interface{} `json:"metadata"` // Metadata populated by middleware
}
ExecutionResult is the output of a pipeline execution. It contains the final state after all middleware has been executed.
type ExecutionTrace ¶
type ExecutionTrace struct {
LLMCalls []LLMCall `json:"llm_calls"` // All LLM API calls made during execution
Events []TraceEvent `json:"events,omitempty"` // Other trace events (tool execution, context truncation, etc.)
StartedAt time.Time `json:"started_at"` // When pipeline execution started
CompletedAt *time.Time `json:"completed_at,omitempty"` // When pipeline execution completed (nil if still running)
}
ExecutionTrace captures the complete execution history of a pipeline run. This includes all LLM calls, tool executions, and other significant events.
type LLMCall ¶
type LLMCall struct {
Sequence int `json:"sequence"` // Call number in sequence (1, 2, 3...)
MessageIndex int `json:"message_index"` // Index into ExecutionResult.Messages where assistant response is stored
Request interface{} `json:"request,omitempty"` // Raw request (if debugging enabled)
Response *Response `json:"response"` // Parsed response
RawResponse interface{} `json:"raw_response,omitempty"` // Raw provider response (if debugging enabled)
StartedAt time.Time `json:"started_at"` // When call started
Duration time.Duration `json:"duration"` // How long the call took
Cost types.CostInfo `json:"cost"` // Cost information for this call
ToolCalls []types.MessageToolCall `json:"tool_calls,omitempty"` // If this call triggered tool execution
Error *string `json:"error,omitempty"` // Error message if the call failed (nil if successful)
}
LLMCall represents a single LLM API call within a pipeline execution. In tool-enabled scenarios, multiple calls may occur in sequence.
type Middleware ¶
type Middleware interface {
Process(ctx *ExecutionContext, next func() error) error
// StreamChunk is called for each chunk during streaming execution (if StreamMode is true).
// Middleware can inspect, validate, or modify chunks. Return an error or call ctx.InterruptStream()
// to stop streaming. Most middleware should return nil (no-op).
StreamChunk(ctx *ExecutionContext, chunk *providers.StreamChunk) error
}
Middleware defines the execution interface for pipeline steps.
Middleware executes in a nested chain where each middleware explicitly calls next() to continue the pipeline. This makes the execution flow clear and explicit.
Given middleware chain: [A, B, C] Execution order is:
A.Process(ctx, func() {
return B.Process(ctx, func() {
return C.Process(ctx, func() {
return nil // End of chain
})
})
})
Example implementation:
func (m *ProviderMiddleware) Process(ctx *ExecutionContext, next func() error) error {
// Setup/processing logic
response, err := m.provider.Generate(ctx)
if err != nil {
return err
}
ctx.Response = response
// Continue to next middleware
if err := next(); err != nil {
return err
}
// Optional cleanup logic
return nil
}
Error Handling:
- If Process() returns an error, the error is captured in ExecutionContext.Error
- Errors stop the chain - subsequent middleware do not execute
- Middleware can check ExecutionContext.Error to see if earlier steps failed
ExecutionContext is used internally by middleware but users should not create it directly.
type MiddlewareConfig ¶
type MiddlewareConfig struct {
Type string `json:"type"` // Middleware type (e.g., "template", "provider", "validator")
Config map[string]interface{} `json:"config,omitempty"` // Type-specific configuration
}
MiddlewareConfig represents configuration for a specific middleware
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline chains middleware together in sequence.
func NewPipeline ¶
func NewPipeline(middleware ...Middleware) *Pipeline
NewPipeline creates a new pipeline with the given middleware. Uses default runtime configuration.
func NewPipelineWithConfig ¶
func NewPipelineWithConfig(config *PipelineRuntimeConfig, middleware ...Middleware) *Pipeline
NewPipelineWithConfig creates a new pipeline with the given configuration and middleware. If config is nil, uses default configuration. Note: This function does not validate config values for backward compatibility. Use NewPipelineWithConfigValidated for validation.
func NewPipelineWithConfigValidated ¶ added in v1.1.0
func NewPipelineWithConfigValidated(config *PipelineRuntimeConfig, middleware ...Middleware) (*Pipeline, error)
NewPipelineWithConfigValidated creates a new pipeline with validation. Returns an error if config contains invalid values (negative numbers). If config is nil, uses default configuration. If config has zero values for some fields, they are filled with defaults.
func (*Pipeline) Execute ¶
Execute runs the pipeline with the given role and content, returning the execution result. It creates a fresh internal ExecutionContext for each call, preventing state contamination. The role and content parameters are used to create the initial user message. If role is empty, no message is appended (useful for testing). Returns the ExecutionResult containing messages, response, trace, and metadata.
func (*Pipeline) ExecuteStream ¶
func (p *Pipeline) ExecuteStream( ctx context.Context, role string, content string, ) (<-chan providers.StreamChunk, error)
ExecuteStream runs the pipeline in streaming mode, returning a channel of stream chunks. It creates a fresh internal ExecutionContext for each call, preventing state contamination. The role and content parameters are used to create the initial user message. If role is empty, no message is appended (useful for testing). The pipeline executes in the background and closes the channel when complete. The final chunk will contain the ExecutionResult in the FinalResult field.
func (*Pipeline) ExecuteStreamWithMessage ¶
func (p *Pipeline) ExecuteStreamWithMessage( ctx context.Context, message types.Message, ) (<-chan providers.StreamChunk, error)
ExecuteStreamWithMessage runs the pipeline in streaming mode with a complete Message object. This method allows callers to provide a fully-populated message with all fields (Meta, Timestamp, etc.) rather than just role and content. The message is added to the execution context as-is, preserving all fields including Meta, Timestamp, ToolCalls, CostInfo, and Validations.
The pipeline executes in the background and closes the channel when complete. The final chunk will contain the ExecutionResult in the FinalResult field. Returns a channel of StreamChunk objects that will be closed when execution completes.
func (*Pipeline) ExecuteWithMessage ¶
func (p *Pipeline) ExecuteWithMessage(ctx context.Context, message types.Message) (*ExecutionResult, error)
ExecuteWithMessage runs the pipeline with a complete Message object, returning the execution result. This method allows callers to provide a fully-populated message with all fields (Meta, Timestamp, etc.) rather than just role and content. This is useful when you need to preserve metadata or other message properties through the pipeline execution.
The message is added to the execution context as-is, preserving all fields including: - Meta (metadata, raw responses, validation info) - Timestamp - ToolCalls - CostInfo - Validations
Middleware can still modify the message during execution if needed. Returns the ExecutionResult containing messages, response, trace, and metadata.
func (*Pipeline) ExecuteWithOptions ¶ added in v1.1.2
func (p *Pipeline) ExecuteWithOptions(opts *ExecutionOptions, role, content string) (*ExecutionResult, error)
ExecuteWithOptions runs the pipeline with the given role, content, and execution options. This method provides fine-grained control over execution including RunID, SessionID, and ConversationID. Returns the ExecutionResult containing messages, response, trace, and metadata.
type PipelineConfig ¶
type PipelineConfig struct {
Stages []string `json:"stages"` // Pipeline stages in order (e.g., ["template", "provider", "validator"])
Middleware []MiddlewareConfig `json:"middleware,omitempty"` // Middleware configurations
}
PipelineConfig represents the complete pipeline configuration for pack format
type PipelineRuntimeConfig ¶
type PipelineRuntimeConfig struct {
// MaxConcurrentExecutions limits the number of concurrent pipeline executions.
// Default: 100
MaxConcurrentExecutions int
// StreamBufferSize sets the buffer size for streaming output channels.
// Default: 100
StreamBufferSize int
// ExecutionTimeout sets the maximum duration for a single pipeline execution.
// Set to 0 to disable timeout.
// Default: 30 seconds
ExecutionTimeout time.Duration
// GracefulShutdownTimeout sets the maximum time to wait for in-flight executions during shutdown.
// Default: 10 seconds
GracefulShutdownTimeout time.Duration
}
PipelineRuntimeConfig defines runtime configuration options for pipeline execution. All fields have sensible defaults and are optional.
func DefaultPipelineRuntimeConfig ¶
func DefaultPipelineRuntimeConfig() *PipelineRuntimeConfig
DefaultPipelineRuntimeConfig returns a PipelineRuntimeConfig with sensible default values.
type ProviderMiddlewareConfig ¶
type ProviderMiddlewareConfig struct {
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"` // Retry policy
TimeoutMs int `json:"timeout_ms,omitempty"` // Request timeout in milliseconds
DisableTrace bool `json:"disable_trace,omitempty"` // Disable execution tracing (default: false = tracing enabled)
}
ProviderMiddlewareConfig contains configuration for provider middleware
type Response ¶
type Response struct {
Role string
Content string
Parts []types.ContentPart // Multimodal content parts (text, image, audio, video)
ToolCalls []types.MessageToolCall
FinalResponse string // If tools were used, this is the final response after tools
Metadata ResponseMetadata
}
Response represents the final output from a pipeline execution.
type ResponseMetadata ¶
type ResponseMetadata struct {
Provider string
Model string
Latency time.Duration
TokensInput int
TokensOutput int
Cost float64
}
ResponseMetadata contains metadata about the response.
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int `json:"max_retries"` // Maximum retry attempts
Backoff string `json:"backoff"` // Backoff strategy ("fixed", "exponential")
InitialDelayMs int `json:"initial_delay_ms,omitempty"` // Initial delay in milliseconds
}
RetryPolicy defines retry behavior for provider middleware
type StateStoreConfig ¶
type StateStoreConfig struct {
Store interface{} // State store implementation (statestore.Store)
ConversationID string // Unique conversation identifier
UserID string // User identifier (optional)
Metadata map[string]interface{} // Additional metadata to store (optional)
}
StateStoreConfig contains configuration for state store middleware
type TemplateMiddlewareConfig ¶
type TemplateMiddlewareConfig struct {
StrictMode bool `json:"strict_mode"` // Fail on undefined variables
AllowUndefined bool `json:"allow_undefined"` // Allow undefined variables (opposite of strict_mode)
}
TemplateMiddlewareConfig contains configuration for template middleware
type ToolPolicy ¶
type ToolPolicy struct {
ToolChoice string `json:"tool_choice,omitempty"` // "auto", "required", "none", or specific tool name
MaxRounds int `json:"max_rounds,omitempty"`
MaxToolCallsPerTurn int `json:"max_tool_calls_per_turn,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
}
ToolPolicy defines constraints on tool usage.
type TraceEvent ¶
type TraceEvent struct {
Type string `json:"type"` // Event type (e.g., "tool_execution", "context_truncation", "validation_failed")
Timestamp time.Time `json:"timestamp"` // When the event occurred
Data interface{} `json:"data"` // Event-specific data
Message string `json:"message,omitempty"` // Human-readable description
}
TraceEvent represents a significant event during pipeline execution.
type ValidationError ¶
type ValidationError struct {
Type string
Details string
Failures []types.ValidationResult // All failed validations (for aggregation)
}
ValidationError represents a validation failure.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
Error returns the error message for this validation error.
type ValidatorMiddlewareConfig ¶
type ValidatorMiddlewareConfig struct {
FailFast bool `json:"fail_fast"` // Stop on first validation error
CollectAllErrors bool `json:"collect_all_errors"` // Collect all errors before failing
}
ValidatorMiddlewareConfig contains configuration for validator middleware