pipeline

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: Apache-2.0 Imports: 10 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPipelineShuttingDown = errors.New("pipeline is shutting down")
)

Error constants

Functions

This section is empty.

Types

type ExecutionConfig

type ExecutionConfig struct {
	Provider     providers.Provider
	ToolRegistry *tools.Registry
	Temperature  float32
	MaxTokens    int
	Seed         *int
	ToolPolicy   *ToolPolicy
}

ExecutionConfig contains configuration for pipeline execution.

type ExecutionContext

type ExecutionContext struct {
	// Context for cancellation, deadlines, and request-scoped values
	Context context.Context

	// Identifiers for organization and tracking
	RunID          string // Test run or execution batch identifier
	SessionID      string // Session identifier for grouping related conversations
	ConversationID string // Conversation identifier for this specific exchange

	// State (mutable by middleware)
	SystemPrompt     string                    // Populated by PromptAssemblyMiddleware
	Variables        map[string]string         // Populated by PromptAssemblyMiddleware and ContextExtractionMiddleware
	AllowedTools     []string                  // Populated by PromptAssemblyMiddleware
	Messages         []types.Message           // Conversation history + current messages
	Tools            []types.ToolDef           // Available tool definitions
	ToolResults      []types.MessageToolResult // Executed tool results
	PendingToolCalls []types.MessageToolCall   // Tool calls awaiting external completion (human-in-the-loop)
	Prompt           string                    // Assembled prompt (after variable substitution by TemplateMiddleware)

	// Output (populated by middleware)
	Trace       ExecutionTrace // Complete trace of all LLM calls and events
	Response    *Response      // Convenience pointer to the most recent response (= Trace.LLMCalls[len-1].Response)
	RawResponse interface{}    // Convenience pointer to most recent raw response (= Trace.LLMCalls[len-1].RawResponse)

	// Error tracking (middleware can check this to see if an error occurred earlier in the chain)
	Error error // First error encountered during execution (subsequent middleware still run)

	// Metadata (for passing data between middleware)
	Metadata map[string]interface{}

	// Cost tracking (aggregate across all calls)
	CostInfo types.CostInfo

	// Streaming support
	StreamMode        bool                       // If true, use streaming execution
	StreamOutput      chan providers.StreamChunk // Output channel for streaming chunks
	StreamInterrupted bool                       // Set to true by middleware to stop streaming
	InterruptReason   string                     // Reason for interruption

	// Middleware control
	ShortCircuit bool // Set to true by middleware to intentionally skip remaining middleware
	// contains filtered or unexported fields
}

ExecutionContext is the execution state passed through the middleware chain. It contains all the data needed for pipeline execution and is modified by middleware.

func (*ExecutionContext) AddPendingToolCall

func (ctx *ExecutionContext) AddPendingToolCall(toolCall types.MessageToolCall)

AddPendingToolCall adds a tool call to the pending list. Used by middleware when a tool returns ToolStatusPending.

func (*ExecutionContext) ClearPendingToolCalls

func (ctx *ExecutionContext) ClearPendingToolCalls()

ClearPendingToolCalls removes all pending tool calls.

func (*ExecutionContext) EmitStreamChunk

func (ctx *ExecutionContext) EmitStreamChunk(chunk providers.StreamChunk) bool

EmitStreamChunk emits a stream chunk to the output channel. Returns false if the stream has been interrupted or the channel is closed. Middleware that produces chunks should check the return value to know when to stop.

func (*ExecutionContext) GetPendingToolCall

func (ctx *ExecutionContext) GetPendingToolCall(id string) *types.MessageToolCall

GetPendingToolCall retrieves a pending tool call by ID. Returns nil if not found.

func (*ExecutionContext) HasPendingToolCalls

func (ctx *ExecutionContext) HasPendingToolCalls() bool

HasPendingToolCalls returns true if there are any pending tool calls.

func (*ExecutionContext) InterruptStream

func (ctx *ExecutionContext) InterruptStream(reason string)

InterruptStream interrupts the stream with the given reason. Middleware should call this to stop streaming when validation fails, rate limits are hit, etc.

func (*ExecutionContext) IsStreaming

func (ctx *ExecutionContext) IsStreaming() bool

IsStreaming returns true if the execution context is in streaming mode.

func (*ExecutionContext) RecordLLMCall added in v1.1.0

func (ctx *ExecutionContext) RecordLLMCall(disableTrace bool, response *Response, startTime time.Time, duration time.Duration, costInfo *types.CostInfo, toolCalls []types.MessageToolCall)

RecordLLMCall adds an LLM call to the execution trace if tracing is enabled. This method is called by provider middleware to track all LLM interactions.

func (*ExecutionContext) RemovePendingToolCall

func (ctx *ExecutionContext) RemovePendingToolCall(id string) bool

RemovePendingToolCall removes a tool call from the pending list by ID. Returns true if the tool call was found and removed.

type ExecutionOptions added in v1.1.2

type ExecutionOptions struct {
	// RunID identifies the test run or execution batch
	RunID string

	// SessionID identifies the session for grouping related conversations
	SessionID string

	// ConversationID identifies this specific conversation exchange
	ConversationID string

	// Context for cancellation and timeouts (if nil, uses context.Background())
	Context context.Context
}

ExecutionOptions provides optional parameters for pipeline execution. All fields are optional and will use sensible defaults if not provided.

type ExecutionResult

type ExecutionResult struct {
	Messages []types.Message        `json:"messages"`  // All messages including history and responses
	Response *Response              `json:"response"`  // The final response (convenience field)
	Trace    ExecutionTrace         `json:"trace"`     // Complete execution trace with all LLM calls
	CostInfo types.CostInfo         `json:"cost_info"` // Aggregate cost across all LLM calls
	Metadata map[string]interface{} `json:"metadata"`  // Metadata populated by middleware
}

ExecutionResult is the output of a pipeline execution. It contains the final state after all middleware has been executed.

type ExecutionTrace

type ExecutionTrace struct {
	LLMCalls    []LLMCall    `json:"llm_calls"`              // All LLM API calls made during execution
	Events      []TraceEvent `json:"events,omitempty"`       // Other trace events (tool execution, context truncation, etc.)
	StartedAt   time.Time    `json:"started_at"`             // When pipeline execution started
	CompletedAt *time.Time   `json:"completed_at,omitempty"` // When pipeline execution completed (nil if still running)
}

ExecutionTrace captures the complete execution history of a pipeline run. This includes all LLM calls, tool executions, and other significant events.

type LLMCall

type LLMCall struct {
	Sequence     int                     `json:"sequence"`               // Call number in sequence (1, 2, 3...)
	MessageIndex int                     `json:"message_index"`          // Index into ExecutionResult.Messages where assistant response is stored
	Request      interface{}             `json:"request,omitempty"`      // Raw request (if debugging enabled)
	Response     *Response               `json:"response"`               // Parsed response
	RawResponse  interface{}             `json:"raw_response,omitempty"` // Raw provider response (if debugging enabled)
	StartedAt    time.Time               `json:"started_at"`             // When call started
	Duration     time.Duration           `json:"duration"`               // How long the call took
	Cost         types.CostInfo          `json:"cost"`                   // Cost information for this call
	ToolCalls    []types.MessageToolCall `json:"tool_calls,omitempty"`   // If this call triggered tool execution
	Error        *string                 `json:"error,omitempty"`        // Error message if the call failed (nil if successful)
}

LLMCall represents a single LLM API call within a pipeline execution. In tool-enabled scenarios, multiple calls may occur in sequence.

func (*LLMCall) GetError added in v1.1.0

func (l *LLMCall) GetError() error

GetError returns the error as an error type, or nil if no error occurred. This is a convenience method for accessing the Error field as an error type.

func (*LLMCall) SetError added in v1.1.0

func (l *LLMCall) SetError(err error)

SetError sets the error for this LLM call from an error value. If err is nil, clears the error field.

type Middleware

type Middleware interface {
	Process(ctx *ExecutionContext, next func() error) error
	// StreamChunk is called for each chunk during streaming execution (if StreamMode is true).
	// Middleware can inspect, validate, or modify chunks. Return an error or call ctx.InterruptStream()
	// to stop streaming. Most middleware should return nil (no-op).
	StreamChunk(ctx *ExecutionContext, chunk *providers.StreamChunk) error
}

Middleware defines the execution interface for pipeline steps.

Middleware executes in a nested chain where each middleware explicitly calls next() to continue the pipeline. This makes the execution flow clear and explicit.

Given middleware chain: [A, B, C] Execution order is:

A.Process(ctx, func() {
  return B.Process(ctx, func() {
    return C.Process(ctx, func() {
      return nil // End of chain
    })
  })
})

Example implementation:

func (m *ProviderMiddleware) Process(ctx *ExecutionContext, next func() error) error {
  // Setup/processing logic
  response, err := m.provider.Generate(ctx)
  if err != nil {
    return err
  }
  ctx.Response = response

  // Continue to next middleware
  if err := next(); err != nil {
    return err
  }

  // Optional cleanup logic
  return nil
}

Error Handling:

  • If Process() returns an error, the error is captured in ExecutionContext.Error
  • Errors stop the chain - subsequent middleware do not execute
  • Middleware can check ExecutionContext.Error to see if earlier steps failed

ExecutionContext is used internally by middleware but users should not create it directly.

type MiddlewareConfig

type MiddlewareConfig struct {
	Type   string                 `json:"type"`             // Middleware type (e.g., "template", "provider", "validator")
	Config map[string]interface{} `json:"config,omitempty"` // Type-specific configuration
}

MiddlewareConfig represents configuration for a specific middleware

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline chains middleware together in sequence.

func NewPipeline

func NewPipeline(middleware ...Middleware) *Pipeline

NewPipeline creates a new pipeline with the given middleware. Uses default runtime configuration.

func NewPipelineWithConfig

func NewPipelineWithConfig(config *PipelineRuntimeConfig, middleware ...Middleware) *Pipeline

NewPipelineWithConfig creates a new pipeline with the given configuration and middleware. If config is nil, uses default configuration. Note: This function does not validate config values for backward compatibility. Use NewPipelineWithConfigValidated for validation.

func NewPipelineWithConfigValidated added in v1.1.0

func NewPipelineWithConfigValidated(config *PipelineRuntimeConfig, middleware ...Middleware) (*Pipeline, error)

NewPipelineWithConfigValidated creates a new pipeline with validation. Returns an error if config contains invalid values (negative numbers). If config is nil, uses default configuration. If config has zero values for some fields, they are filled with defaults.

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context, role, content string) (*ExecutionResult, error)

Execute runs the pipeline with the given role and content, returning the execution result. It creates a fresh internal ExecutionContext for each call, preventing state contamination. The role and content parameters are used to create the initial user message. If role is empty, no message is appended (useful for testing). Returns the ExecutionResult containing messages, response, trace, and metadata.

func (*Pipeline) ExecuteStream

func (p *Pipeline) ExecuteStream(
	ctx context.Context,
	role string,
	content string,
) (<-chan providers.StreamChunk, error)

ExecuteStream runs the pipeline in streaming mode, returning a channel of stream chunks. It creates a fresh internal ExecutionContext for each call, preventing state contamination. The role and content parameters are used to create the initial user message. If role is empty, no message is appended (useful for testing). The pipeline executes in the background and closes the channel when complete. The final chunk will contain the ExecutionResult in the FinalResult field.

func (*Pipeline) ExecuteStreamWithMessage

func (p *Pipeline) ExecuteStreamWithMessage(
	ctx context.Context,
	message types.Message,
) (<-chan providers.StreamChunk, error)

ExecuteStreamWithMessage runs the pipeline in streaming mode with a complete Message object. This method allows callers to provide a fully-populated message with all fields (Meta, Timestamp, etc.) rather than just role and content. The message is added to the execution context as-is, preserving all fields including Meta, Timestamp, ToolCalls, CostInfo, and Validations.

The pipeline executes in the background and closes the channel when complete. The final chunk will contain the ExecutionResult in the FinalResult field. Returns a channel of StreamChunk objects that will be closed when execution completes.

func (*Pipeline) ExecuteWithMessage

func (p *Pipeline) ExecuteWithMessage(ctx context.Context, message types.Message) (*ExecutionResult, error)

ExecuteWithMessage runs the pipeline with a complete Message object, returning the execution result. This method allows callers to provide a fully-populated message with all fields (Meta, Timestamp, etc.) rather than just role and content. This is useful when you need to preserve metadata or other message properties through the pipeline execution.

The message is added to the execution context as-is, preserving all fields including: - Meta (metadata, raw responses, validation info) - Timestamp - ToolCalls - CostInfo - Validations

Middleware can still modify the message during execution if needed. Returns the ExecutionResult containing messages, response, trace, and metadata.

func (*Pipeline) ExecuteWithOptions added in v1.1.2

func (p *Pipeline) ExecuteWithOptions(opts *ExecutionOptions, role, content string) (*ExecutionResult, error)

ExecuteWithOptions runs the pipeline with the given role, content, and execution options. This method provides fine-grained control over execution including RunID, SessionID, and ConversationID. Returns the ExecutionResult containing messages, response, trace, and metadata.

func (*Pipeline) Shutdown

func (p *Pipeline) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the pipeline, waiting for in-flight executions to complete. Returns an error if shutdown times out according to GracefulShutdownTimeout.

type PipelineConfig

type PipelineConfig struct {
	Stages     []string           `json:"stages"`               // Pipeline stages in order (e.g., ["template", "provider", "validator"])
	Middleware []MiddlewareConfig `json:"middleware,omitempty"` // Middleware configurations
}

PipelineConfig represents the complete pipeline configuration for pack format

type PipelineRuntimeConfig

type PipelineRuntimeConfig struct {
	// MaxConcurrentExecutions limits the number of concurrent pipeline executions.
	// Default: 100
	MaxConcurrentExecutions int

	// StreamBufferSize sets the buffer size for streaming output channels.
	// Default: 100
	StreamBufferSize int

	// ExecutionTimeout sets the maximum duration for a single pipeline execution.
	// Set to 0 to disable timeout.
	// Default: 30 seconds
	ExecutionTimeout time.Duration

	// GracefulShutdownTimeout sets the maximum time to wait for in-flight executions during shutdown.
	// Default: 10 seconds
	GracefulShutdownTimeout time.Duration
}

PipelineRuntimeConfig defines runtime configuration options for pipeline execution. All fields have sensible defaults and are optional.

func DefaultPipelineRuntimeConfig

func DefaultPipelineRuntimeConfig() *PipelineRuntimeConfig

DefaultPipelineRuntimeConfig returns a PipelineRuntimeConfig with sensible default values.

type ProviderMiddlewareConfig

type ProviderMiddlewareConfig struct {
	RetryPolicy  *RetryPolicy `json:"retry_policy,omitempty"`  // Retry policy
	TimeoutMs    int          `json:"timeout_ms,omitempty"`    // Request timeout in milliseconds
	DisableTrace bool         `json:"disable_trace,omitempty"` // Disable execution tracing (default: false = tracing enabled)
}

ProviderMiddlewareConfig contains configuration for provider middleware

type Response

type Response struct {
	Role          string
	Content       string
	Parts         []types.ContentPart // Multimodal content parts (text, image, audio, video)
	ToolCalls     []types.MessageToolCall
	FinalResponse string // If tools were used, this is the final response after tools
	Metadata      ResponseMetadata
}

Response represents the final output from a pipeline execution.

type ResponseMetadata

type ResponseMetadata struct {
	Provider     string
	Model        string
	Latency      time.Duration
	TokensInput  int
	TokensOutput int
	Cost         float64
}

ResponseMetadata contains metadata about the response.

type RetryPolicy

type RetryPolicy struct {
	MaxRetries     int    `json:"max_retries"`                // Maximum retry attempts
	Backoff        string `json:"backoff"`                    // Backoff strategy ("fixed", "exponential")
	InitialDelayMs int    `json:"initial_delay_ms,omitempty"` // Initial delay in milliseconds
}

RetryPolicy defines retry behavior for provider middleware

type StateStoreConfig

type StateStoreConfig struct {
	Store          interface{}            // State store implementation (statestore.Store)
	ConversationID string                 // Unique conversation identifier
	UserID         string                 // User identifier (optional)
	Metadata       map[string]interface{} // Additional metadata to store (optional)
}

StateStoreConfig contains configuration for state store middleware

type TemplateMiddlewareConfig

type TemplateMiddlewareConfig struct {
	StrictMode     bool `json:"strict_mode"`     // Fail on undefined variables
	AllowUndefined bool `json:"allow_undefined"` // Allow undefined variables (opposite of strict_mode)
}

TemplateMiddlewareConfig contains configuration for template middleware

type ToolPolicy

type ToolPolicy struct {
	ToolChoice          string   `json:"tool_choice,omitempty"` // "auto", "required", "none", or specific tool name
	MaxRounds           int      `json:"max_rounds,omitempty"`
	MaxToolCallsPerTurn int      `json:"max_tool_calls_per_turn,omitempty"`
	Blocklist           []string `json:"blocklist,omitempty"`
}

ToolPolicy defines constraints on tool usage.

type TraceEvent

type TraceEvent struct {
	Type      string      `json:"type"`              // Event type (e.g., "tool_execution", "context_truncation", "validation_failed")
	Timestamp time.Time   `json:"timestamp"`         // When the event occurred
	Data      interface{} `json:"data"`              // Event-specific data
	Message   string      `json:"message,omitempty"` // Human-readable description
}

TraceEvent represents a significant event during pipeline execution.

type ValidationError

type ValidationError struct {
	Type     string
	Details  string
	Failures []types.ValidationResult // All failed validations (for aggregation)
}

ValidationError represents a validation failure.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error returns the error message for this validation error.

type ValidatorMiddlewareConfig

type ValidatorMiddlewareConfig struct {
	FailFast         bool `json:"fail_fast"`          // Stop on first validation error
	CollectAllErrors bool `json:"collect_all_errors"` // Collect all errors before failing
}

ValidatorMiddlewareConfig contains configuration for validator middleware

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL