engine

package
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

Package engine defines future-proofing interfaces for PromptKit These interfaces will be implemented in future versions to support distributed execution, optimization, and advanced features.

Package engine orchestrates test execution across scenarios, providers, and configurations.

The engine package is the core execution layer of the Arena testing tool. It manages:

  • Conversation lifecycle and message flow
  • Provider and model configuration
  • Telemetry and metrics collection
  • Result aggregation and validation
  • Concurrent test execution across multiple scenarios

Key types:

  • Engine: Main orchestration struct that executes test runs
  • RunResult: Contains execution results, metrics, and conversation history
  • RunFilters: Filters for selective test execution

Example usage:

eng, _ := engine.NewEngine(cfg, providerRegistry, promptRegistry)
results, _ := eng.Execute(ctx, filters)
for _, result := range results {
    fmt.Printf("Scenario %s: %s\n", result.ScenarioID, result.Status)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FormatFileSize added in v1.1.0

func FormatFileSize(bytes int64) string

FormatFileSize formats bytes as human-readable size

func FormatMediaType added in v1.1.0

func FormatMediaType(mediaType string) string

FormatMediaType returns a human-readable label for media type

Types

type ArenaConfig

type ArenaConfig struct {
	APIVersion string            `yaml:"apiVersion"`
	Kind       string            `yaml:"kind"`
	Metadata   metav1.ObjectMeta `yaml:"metadata,omitempty"`
	Spec       config.Config     `yaml:"spec"`
}

ArenaConfig represents a K8s-style arena configuration manifest

type AssertionsSummary added in v1.1.3

type AssertionsSummary struct {
	Failed  int                                       `json:"failed"`
	Passed  bool                                      `json:"passed"`
	Results []assertions.ConversationValidationResult `json:"results"`
	Total   int                                       `json:"total"`
}

AssertionsSummary matches the structure used for turn-level assertions in message meta

type ConversationExecutor

type ConversationExecutor interface {
	// ExecuteConversation runs a complete conversation based on scenario
	ExecuteConversation(ctx context.Context, req ConversationRequest) *ConversationResult

	// ExecuteConversationStream runs a conversation with streaming
	ExecuteConversationStream(ctx context.Context, req ConversationRequest) (<-chan ConversationStreamChunk, error)
}

ConversationExecutor orchestrates full conversation flows

func NewDefaultConversationExecutor

func NewDefaultConversationExecutor(
	scriptedExecutor turnexecutors.TurnExecutor,
	selfPlayExecutor turnexecutors.TurnExecutor,
	selfPlayRegistry *selfplay.Registry,
	promptRegistry *prompt.Registry,
) ConversationExecutor

NewDefaultConversationExecutor creates a new conversation executor

type ConversationRequest

type ConversationRequest struct {
	// Required fields
	Provider providers.Provider
	Scenario *config.Scenario
	Config   *config.Config
	Region   string

	// Optional overrides (for future use)
	Temperature *float64 // Override scenario temperature
	MaxTokens   *int     // Override scenario max tokens
	Timeout     *int     // Timeout in seconds

	// For distributed execution and tracing (v0.2.0+)
	RunID    string            // Unique identifier for this run
	Metadata map[string]string // Additional metadata for debugging/tracing

	// Event bus for runtime/TUI events
	EventBus *events.EventBus

	// State management
	StateStoreConfig *StateStoreConfig // Optional state store configuration
	ConversationID   string            // Conversation identifier for state persistence
}

ConversationRequest contains all data needed for conversation execution. Using a request object makes the API extensible without breaking changes.

type ConversationResult

type ConversationResult struct {
	Messages     []types.Message         // Flat list of all messages in the conversation
	Cost         types.CostInfo          // Total cost across all messages
	ToolStats    *types.ToolStats        // Tool usage statistics
	Violations   []types.ValidationError // Validation errors
	MediaOutputs []MediaOutput           // Media outputs generated by LLMs

	// Conversation-level assertions
	ConversationAssertionResults []assertions.ConversationValidationResult `json:"conv_assertions_results,omitempty"`

	// Self-play metadata
	SelfPlay  bool   `json:"self_play,omitempty"`
	PersonaID string `json:"persona_id,omitempty"`

	// Error handling
	Error  string `json:"error,omitempty"`  // Error message if execution failed
	Failed bool   `json:"failed,omitempty"` // Whether execution failed (but partial results may be available)
}

ConversationResult contains the outcome of conversation execution

type ConversationStreamChunk

type ConversationStreamChunk struct {
	// Current turn number (0-indexed)
	TurnIndex int

	// Delta content from this specific chunk
	Delta string

	// Token count (accumulated for current turn)
	TokenCount int

	// Finish reason for current turn (only in last chunk of turn)
	FinishReason *string

	// Complete conversation result (accumulated, updated with each chunk)
	Result *ConversationResult

	// Error if streaming failed
	Error error

	// Metadata
	Metadata map[string]interface{}
}

ConversationStreamChunk represents a streaming chunk during conversation execution

type DefaultConversationExecutor

type DefaultConversationExecutor struct {
	// contains filtered or unexported fields
}

DefaultConversationExecutor implements ConversationExecutor interface

func (*DefaultConversationExecutor) ExecuteConversation

ExecuteConversation runs a complete conversation based on scenario using the new Turn model

func (*DefaultConversationExecutor) ExecuteConversationStream

func (ce *DefaultConversationExecutor) ExecuteConversationStream(ctx context.Context, req ConversationRequest) (<-chan ConversationStreamChunk, error)

ExecuteConversationStream runs a complete conversation with streaming

type DistributedExecutor

type DistributedExecutor interface {
	// Execute runs work units across distributed workers
	Execute(ctx context.Context, units []WorkUnit) error

	// Status returns current execution status
	Status() ExecutionStatus

	// Workers returns information about available workers
	Workers() []WorkerInfo

	// Scale adjusts the number of workers
	Scale(ctx context.Context, workerCount int) error
}

DistributedExecutor coordinates distributed execution of work units Planned for v0.2.0 - Distributed Execution Support

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine manages the execution of prompt testing scenarios across multiple providers, regions, and configurations. It coordinates conversation execution, tool calling, validation, and result collection.

The engine supports both scripted conversations and self-play mode where an LLM simulates user behavior. It handles provider initialization, concurrent execution, and comprehensive result tracking including costs and tool usage.

func NewEngine

func NewEngine(
	cfg *config.Config,
	providerRegistry *providers.Registry,
	promptRegistry *prompt.Registry,
	mcpRegistry *mcp.RegistryImpl,
	convExecutor ConversationExecutor,
) (*Engine, error)

NewEngine creates a new simulation engine from pre-built components. This is the primary constructor for the Engine and is preferred for testing where components can be created and configured independently.

This constructor uses dependency injection, accepting all required registries and executors as parameters. This makes testing easier and follows better architectural practices.

Parameters:

  • cfg: Fully loaded and validated Config object
  • providerRegistry: Registry for looking up providers by ID
  • promptRegistry: Registry for system prompts and task types
  • convExecutor: Executor for full conversations

Returns an initialized Engine ready for test execution.

func NewEngineFromConfigFile

func NewEngineFromConfigFile(configPath string) (*Engine, error)

NewEngineFromConfigFile creates a new simulation engine from a configuration file. It loads the configuration, validates it, initializes all registries, and sets up the execution pipeline for conversation testing.

The configuration file is loaded along with all referenced resources (scenarios, providers, tools, personas), making the Config object fully self-contained.

This constructor performs all necessary initialization steps in the correct order: 1. Load and validate configuration from file (including all referenced resources) 2. Build registries from loaded resources 3. Initialize executors (turn, conversation, self-play if enabled) 4. Create Engine with all components

Note: Logger verbosity should be configured at application startup, not here. This function does not modify global logger settings.

Parameters:

  • configPath: Path to the arena.yaml configuration file

Returns an initialized Engine ready for test execution, or an error if:

  • Configuration file cannot be read or parsed
  • Configuration validation fails
  • Any resource file cannot be loaded
  • Provider type is unsupported

func (*Engine) Close

func (e *Engine) Close() error

Close shuts down the engine and cleans up resources. This includes closing all MCP server connections and provider HTTP clients.

func (*Engine) EnableMockProviderMode added in v1.1.0

func (e *Engine) EnableMockProviderMode(mockConfigPath string) error

EnableMockProviderMode replaces all providers in the registry with mock providers. This enables testing of scenario behavior without making real API calls. Mock providers can use either file-based configuration for scenario-specific responses or default in-memory responses.

Parameters:

  • mockConfigPath: Optional path to YAML configuration file for mock responses

Returns an error if the mock configuration file cannot be loaded or parsed.

func (*Engine) ExecuteRuns

func (e *Engine) ExecuteRuns(ctx context.Context, plan *RunPlan, concurrency int) ([]string, error)

GetStateStore returns the engine's state store for accessing run results Runs are executed concurrently up to the specified concurrency limit, with run IDs collected in order matching the input plan.

Each run executes independently: - Loads scenario and provider - Executes conversation turns (with self-play if configured) - Runs validators on the results - Tracks costs, timing, and tool calls - Saves results to StateStore

The context can be used to cancel all in-flight executions. Run IDs are returned for all combinations, with errors captured in individual RunResult (accessible via StateStore).

Parameters:

  • ctx: Context for cancellation
  • plan: RunPlan containing combinations to execute
  • concurrency: Maximum number of simultaneous executions

Returns a slice of RunIDs in the same order as plan.Combinations, or an error if execution setup fails. Individual run errors are stored in StateStore, not returned here.

func (*Engine) GenerateRunPlan

func (e *Engine) GenerateRunPlan(regionFilter, providerFilter, scenarioFilter []string) (*RunPlan, error)

GenerateRunPlan creates a comprehensive test execution plan from filter criteria. The plan contains all combinations of regions × providers × scenarios that match the provided filters. Scenarios can optionally specify which providers they should be tested against via the `providers` field.

Filter behavior: - regionFilter: Empty = all regions from prompt configs (or default) - providerFilter: Empty = all registered providers (or scenario-specified providers) - scenarioFilter: Empty = all loaded scenarios

Provider selection logic: 1. If scenario specifies providers: use those (intersected with CLI filter if provided) 2. If scenario doesn't specify providers: use all arena providers (intersected with CLI filter)

Returns a RunPlan containing all matching combinations, ready for execution. Each combination represents one independent test run that will be executed and validated separately.

func (*Engine) GetStateStore

func (e *Engine) GetStateStore() statestore.Store

GetStateStore returns the engine's state store for accessing run results

func (*Engine) SetEventBus added in v1.1.4

func (e *Engine) SetEventBus(bus *events.EventBus)

SetEventBus configures the shared event bus used for runtime and TUI observability.

type ExecutionStatus

type ExecutionStatus struct {
	Total     int           `json:"total"`      // Total work units
	Completed int           `json:"completed"`  // Completed work units
	Running   int           `json:"running"`    // Currently executing
	Failed    int           `json:"failed"`     // Failed work units
	StartTime time.Time     `json:"start_time"` // Execution start time
	Duration  time.Duration `json:"duration"`   // Total duration so far
	Workers   int           `json:"workers"`    // Active worker count
}

ExecutionStatus represents the state of distributed execution

type MediaOutput added in v1.1.0

type MediaOutput struct {
	Type       string `json:"type"`                // "image", "audio", "video"
	MIMEType   string `json:"mime_type"`           // e.g., "image/jpeg", "audio/mp3"
	SizeBytes  int64  `json:"size_bytes"`          // Size of the media file
	Duration   *int   `json:"duration,omitempty"`  // Duration in seconds (for audio/video)
	Width      *int   `json:"width,omitempty"`     // Width in pixels (for image/video)
	Height     *int   `json:"height,omitempty"`    // Height in pixels (for image/video)
	FilePath   string `json:"file_path,omitempty"` // Path where media was saved
	Thumbnail  string `json:"thumbnail,omitempty"` // Base64-encoded thumbnail for HTML reports
	MessageIdx int    `json:"message_index"`       // Index of message containing this media
	PartIdx    int    `json:"part_index"`          // Index of content part within the message
}

MediaOutput represents media content generated by an LLM during test execution

func CollectMediaOutputs added in v1.1.0

func CollectMediaOutputs(messages []types.Message) []MediaOutput

CollectMediaOutputs extracts media outputs from conversation messages Returns a slice of MediaOutput for tracking in RunResult

type MediaOutputStats added in v1.1.0

type MediaOutputStats struct {
	Total          int            `json:"total"`
	ImageCount     int            `json:"image_count"`
	AudioCount     int            `json:"audio_count"`
	VideoCount     int            `json:"video_count"`
	TotalSizeBytes int64          `json:"total_size_bytes"`
	ByType         map[string]int `json:"by_type"`
}

MediaOutputStats contains summary statistics for media outputs

func GetMediaOutputStatistics added in v1.1.0

func GetMediaOutputStatistics(outputs []MediaOutput) MediaOutputStats

GetMediaOutputStatistics calculates summary statistics for media outputs

type ResultSink

type ResultSink interface {
	// Store saves a work result
	Store(ctx context.Context, result WorkResult) error

	// Retrieve gets a work result by work unit ID
	Retrieve(ctx context.Context, workUnitID string) (WorkResult, error)

	// Query searches for results matching criteria
	Query(ctx context.Context, criteria map[string]interface{}) ([]WorkResult, error)

	// Subscribe to real-time result updates
	Subscribe(ctx context.Context, filter func(WorkResult) bool) (<-chan WorkResult, error)
}

ResultSink handles storage and processing of work results Planned for v0.2.0 - Distributed Execution Support

type RunCombination

type RunCombination struct {
	Region     string
	ScenarioID string
	ProviderID string
}

RunCombination represents a single test execution

type RunPlan

type RunPlan struct {
	Combinations []RunCombination
}

RunPlan defines the test execution plan

type RunResult

type RunResult struct {
	RunID      string                  `json:"RunID"`
	PromptPack string                  `json:"PromptPack"`
	Region     string                  `json:"Region"`
	ScenarioID string                  `json:"ScenarioID"`
	ProviderID string                  `json:"ProviderID"`
	Params     map[string]interface{}  `json:"Params"`
	Messages   []types.Message         `json:"Messages"`
	Commit     map[string]interface{}  `json:"Commit"`
	Cost       types.CostInfo          `json:"Cost"`
	ToolStats  *types.ToolStats        `json:"ToolStats"`
	Violations []types.ValidationError `json:"Violations"`
	StartTime  time.Time               `json:"StartTime"`
	EndTime    time.Time               `json:"EndTime"`
	Duration   time.Duration           `json:"Duration"`
	Error      string                  `json:"Error"`
	SelfPlay   bool                    `json:"SelfPlay"`
	PersonaID  string                  `json:"PersonaID"`

	UserFeedback  *statestore.Feedback `json:"UserFeedback"`
	SessionTags   []string             `json:"SessionTags"`
	AssistantRole *SelfPlayRoleInfo    `json:"AssistantRole"`
	UserRole      *SelfPlayRoleInfo    `json:"UserRole"`

	// Media outputs generated by LLMs during test execution
	MediaOutputs []MediaOutput `json:"MediaOutputs,omitempty"`

	// Conversation-level assertions evaluated after the conversation completes (summary format)
	ConversationAssertions AssertionsSummary `json:"conversation_assertions,omitempty"`
}

RunResult contains the complete results of a single test execution

type SelfPlayRoleInfo

type SelfPlayRoleInfo struct {
	Provider string
	Model    string
	Region   string
}

SelfPlayRoleInfo contains provider information for self-play roles

type StateStoreConfig

type StateStoreConfig struct {
	Store    interface{}            // State store implementation (statestore.Store)
	UserID   string                 // User identifier (optional)
	Metadata map[string]interface{} // Additional metadata to store (optional)
}

StateStoreConfig wraps the pipeline StateStore configuration for Arena

type WorkResult

type WorkResult interface {
	// WorkUnitID returns the ID of the work unit that produced this result
	WorkUnitID() string

	// Success indicates if the work unit completed successfully
	Success() bool

	// Data returns the result data (conversation, analysis, etc.)
	Data() interface{}

	// Error returns any error that occurred during execution
	Error() error

	// Metadata returns additional metadata about the execution
	Metadata() map[string]interface{}
}

WorkResult represents the result of executing a WorkUnit

type WorkSource

type WorkSource interface {
	// Generate creates work units from configuration
	Generate(ctx context.Context, config interface{}) ([]WorkUnit, error)

	// Stream provides work units as they become available
	Stream(ctx context.Context, config interface{}) (<-chan WorkUnit, error)

	// EstimateWorkload returns expected number of work units
	EstimateWorkload(ctx context.Context, config interface{}) (int, error)
}

WorkSource generates work units for execution Planned for v0.2.0 - Distributed Execution Support

type WorkUnit

type WorkUnit interface {
	// ID returns unique identifier for this work unit
	ID() string

	// Execute performs the work unit and returns results
	Execute(ctx context.Context) (WorkResult, error)

	// Dependencies returns work units that must complete before this one
	Dependencies() []string

	// Priority returns execution priority (higher number = higher priority)
	Priority() int

	// EstimatedDuration returns expected execution time
	EstimatedDuration() time.Duration
}

WorkUnit represents a single unit of work in distributed execution Planned for v0.2.0 - Distributed Execution Support

type WorkerInfo

type WorkerInfo struct {
	ID       string                 `json:"id"`        // Worker identifier
	Status   string                 `json:"status"`    // online, offline, busy
	Capacity int                    `json:"capacity"`  // Max concurrent work units
	Current  int                    `json:"current"`   // Current work units
	LastSeen time.Time              `json:"last_seen"` // Last heartbeat
	Metadata map[string]interface{} `json:"metadata"`  // Additional worker info
}

WorkerInfo contains information about a distributed worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL