Documentation
¶
Overview ¶
Package engine defines future-proofing interfaces for PromptKit These interfaces will be implemented in future versions to support distributed execution, optimization, and advanced features.
Package engine orchestrates test execution across scenarios, providers, and configurations.
The engine package is the core execution layer of the Arena testing tool. It manages:
- Conversation lifecycle and message flow
- Provider and model configuration
- Telemetry and metrics collection
- Result aggregation and validation
- Concurrent test execution across multiple scenarios
Key types:
- Engine: Main orchestration struct that executes test runs
- RunResult: Contains execution results, metrics, and conversation history
- RunFilters: Filters for selective test execution
Example usage:
eng, _ := engine.NewEngine(cfg, providerRegistry, promptRegistry)
results, _ := eng.Execute(ctx, filters)
for _, result := range results {
fmt.Printf("Scenario %s: %s\n", result.ScenarioID, result.Status)
}
Index ¶
- Constants
- func FormatFileSize(bytes int64) string
- func FormatMediaType(mediaType string) string
- type ArenaConfig
- type AssertionsSummary
- type CompositeConversationExecutor
- func (ce *CompositeConversationExecutor) ExecuteConversation(ctx context.Context, req ConversationRequest) *ConversationResult
- func (ce *CompositeConversationExecutor) ExecuteConversationStream(ctx context.Context, req ConversationRequest) (<-chan ConversationStreamChunk, error)
- func (ce *CompositeConversationExecutor) GetDefaultExecutor() *DefaultConversationExecutor
- func (ce *CompositeConversationExecutor) GetDuplexExecutor() *DuplexConversationExecutor
- func (ce *CompositeConversationExecutor) GetEvalExecutor() *EvalConversationExecutor
- type ConversationExecutor
- type ConversationRequest
- type ConversationResult
- type ConversationStreamChunk
- type DefaultConversationExecutor
- type DistributedExecutor
- type DuplexConversationExecutor
- type Engine
- func (e *Engine) Close() error
- func (e *Engine) ConfigureSessionRecordingFromConfig() error
- func (e *Engine) EnableMockProviderMode(mockConfigPath string) error
- func (e *Engine) EnableSessionRecording(recordingDir string) error
- func (e *Engine) ExecuteRuns(ctx context.Context, plan *RunPlan, concurrency int) ([]string, error)
- func (e *Engine) GenerateRunPlan(regionFilter, providerFilter, scenarioFilter, evalFilter []string) (*RunPlan, error)
- func (e *Engine) GetConfig() *config.Config
- func (e *Engine) GetRecordingDir() string
- func (e *Engine) GetRecordingPath(runID string) string
- func (e *Engine) GetStateStore() statestore.Store
- func (e *Engine) SetEventBus(bus *events.EventBus)
- type EvalConversationExecutor
- type ExecutionStatus
- type MediaOutput
- type MediaOutputStats
- type PackEvalAdapter
- type PackEvalHook
- func (h *PackEvalHook) HasEvals() bool
- func (h *PackEvalHook) RunSessionEvals(ctx context.Context, messages []types.Message, sessionID string) []assertions.ConversationValidationResult
- func (h *PackEvalHook) RunTurnEvals(ctx context.Context, messages []types.Message, turnIndex int, sessionID string) []assertions.ConversationValidationResult
- type ResultSink
- type RunCombination
- type RunPlan
- type RunResult
- type SelfPlayRoleInfo
- type StateStoreConfig
- type WorkResult
- type WorkSource
- type WorkUnit
- type WorkerInfo
Constants ¶
const PackEvalTypePrefix = "pack_eval:"
PackEvalTypePrefix is prepended to eval types when converting to assertion results. Report renderers use this prefix to distinguish pack eval results from scenario assertions.
Variables ¶
This section is empty.
Functions ¶
func FormatFileSize ¶ added in v1.1.0
FormatFileSize formats bytes as human-readable size
func FormatMediaType ¶ added in v1.1.0
FormatMediaType returns a human-readable label for media type
Types ¶
type ArenaConfig ¶
type ArenaConfig struct {
APIVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata metav1.ObjectMeta `yaml:"metadata,omitempty"`
Spec config.Config `yaml:"spec"`
}
ArenaConfig represents a K8s-style arena configuration manifest
type AssertionsSummary ¶ added in v1.1.3
type AssertionsSummary struct {
Failed int `json:"failed"`
Passed bool `json:"passed"`
Results []assertions.ConversationValidationResult `json:"results"`
Total int `json:"total"`
}
AssertionsSummary matches the structure used for turn-level assertions in message meta
type CompositeConversationExecutor ¶ added in v1.1.6
type CompositeConversationExecutor struct {
// contains filtered or unexported fields
}
CompositeConversationExecutor routes conversation execution to the appropriate executor based on scenario configuration. It selects between: - EvalConversationExecutor for evaluation mode (recording replay with assertions) - DefaultConversationExecutor for standard turn-based conversations - DuplexConversationExecutor for bidirectional streaming scenarios
func NewCompositeConversationExecutor ¶ added in v1.1.6
func NewCompositeConversationExecutor( defaultExecutor *DefaultConversationExecutor, duplexExecutor *DuplexConversationExecutor, evalExecutor *EvalConversationExecutor, ) *CompositeConversationExecutor
NewCompositeConversationExecutor creates a new composite executor.
func (*CompositeConversationExecutor) ExecuteConversation ¶ added in v1.1.6
func (ce *CompositeConversationExecutor) ExecuteConversation( ctx context.Context, req ConversationRequest, ) *ConversationResult
ExecuteConversation routes to the appropriate executor based on scenario config. If the request is for eval mode, uses EvalConversationExecutor. If the scenario has duplex configuration, uses DuplexConversationExecutor. Otherwise, uses DefaultConversationExecutor.
func (*CompositeConversationExecutor) ExecuteConversationStream ¶ added in v1.1.6
func (ce *CompositeConversationExecutor) ExecuteConversationStream( ctx context.Context, req ConversationRequest, ) (<-chan ConversationStreamChunk, error)
ExecuteConversationStream routes streaming execution to the appropriate executor.
func (*CompositeConversationExecutor) GetDefaultExecutor ¶ added in v1.1.6
func (ce *CompositeConversationExecutor) GetDefaultExecutor() *DefaultConversationExecutor
GetDefaultExecutor returns the default executor for direct access if needed.
func (*CompositeConversationExecutor) GetDuplexExecutor ¶ added in v1.1.6
func (ce *CompositeConversationExecutor) GetDuplexExecutor() *DuplexConversationExecutor
GetDuplexExecutor returns the duplex executor for direct access if needed.
func (*CompositeConversationExecutor) GetEvalExecutor ¶ added in v1.1.9
func (ce *CompositeConversationExecutor) GetEvalExecutor() *EvalConversationExecutor
GetEvalExecutor returns the eval executor for direct access if needed.
type ConversationExecutor ¶
type ConversationExecutor interface {
// ExecuteConversation runs a complete conversation based on scenario
ExecuteConversation(ctx context.Context, req ConversationRequest) *ConversationResult
// ExecuteConversationStream runs a conversation with streaming
ExecuteConversationStream(ctx context.Context, req ConversationRequest) (<-chan ConversationStreamChunk, error)
}
ConversationExecutor orchestrates full conversation flows
func BuildEngineComponents ¶ added in v1.1.9
func BuildEngineComponents(cfg *config.Config) ( providerRegistry *providers.Registry, promptRegistry *prompt.Registry, mcpRegistry *mcp.RegistryImpl, convExecutor ConversationExecutor, adapterReg *adapters.Registry, a2aCleanup func(), err error, )
BuildEngineComponents builds all engine components from a loaded Config object. This function creates and initializes: - MCP registry and tools (if configured) - Provider registry (for main assistant) - Prompt registry (if configured) - Tool registry (static + MCP tools) - Turn executor - Self-play provider registry (if enabled) - Self-play registry (if enabled) - Conversation executor
This function is exported to enable programmatic creation of Arena engines without requiring file-based configuration. Users can construct a *config.Config programmatically and pass it to this function to get all required registries for use with NewEngine.
Returns all components needed to construct an Engine, or an error if any component fails to build.
type ConversationRequest ¶
type ConversationRequest struct {
// Required fields
Provider providers.Provider
Scenario *config.Scenario
Eval *config.Eval // Eval configuration (mutually exclusive with Scenario)
Config *config.Config
Region string
// Optional overrides (for future use)
Temperature *float64 // Override scenario temperature
MaxTokens *int // Override scenario max tokens
Timeout *int // Timeout in seconds
// For distributed execution and tracing (v0.2.0+)
RunID string // Unique identifier for this run
Metadata map[string]string // Additional metadata for debugging/tracing
// Event bus for runtime/TUI events
EventBus *events.EventBus
// State management
StateStoreConfig *StateStoreConfig // Optional state store configuration
ConversationID string // Conversation identifier for state persistence
}
ConversationRequest contains all data needed for conversation execution. Using a request object makes the API extensible without breaking changes.
type ConversationResult ¶
type ConversationResult struct {
Messages []types.Message // Flat list of all messages in the conversation
Cost types.CostInfo // Total cost across all messages
ToolStats *types.ToolStats // Tool usage statistics
Violations []types.ValidationError // Validation errors
MediaOutputs []MediaOutput // Media outputs generated by LLMs
// Conversation-level assertions
ConversationAssertionResults []assertions.ConversationValidationResult `json:"conv_assertions_results,omitempty"`
// Self-play metadata
SelfPlay bool `json:"self_play,omitempty"`
PersonaID string `json:"persona_id,omitempty"`
// Error handling
Error string `json:"error,omitempty"` // Error message if execution failed
Failed bool `json:"failed,omitempty"` // Whether execution failed (but partial results may be available)
}
ConversationResult contains the outcome of conversation execution
type ConversationStreamChunk ¶
type ConversationStreamChunk struct {
// Current turn number (0-indexed)
TurnIndex int
// Delta content from this specific chunk
Delta string
// Token count (accumulated for current turn)
TokenCount int
// Finish reason for current turn (only in last chunk of turn)
FinishReason *string
// Complete conversation result (accumulated, updated with each chunk)
Result *ConversationResult
// Error if streaming failed
Error error
// Metadata
Metadata map[string]interface{}
}
ConversationStreamChunk represents a streaming chunk during conversation execution
type DefaultConversationExecutor ¶
type DefaultConversationExecutor struct {
// contains filtered or unexported fields
}
DefaultConversationExecutor implements ConversationExecutor interface
func NewDefaultConversationExecutor ¶
func NewDefaultConversationExecutor( scriptedExecutor turnexecutors.TurnExecutor, selfPlayExecutor turnexecutors.TurnExecutor, selfPlayRegistry *selfplay.Registry, promptRegistry *prompt.Registry, packEvalHook *PackEvalHook, ) *DefaultConversationExecutor
NewDefaultConversationExecutor creates a new conversation executor
func (*DefaultConversationExecutor) ExecuteConversation ¶
func (ce *DefaultConversationExecutor) ExecuteConversation(ctx context.Context, req ConversationRequest) *ConversationResult
ExecuteConversation runs a complete conversation based on scenario using the new Turn model
func (*DefaultConversationExecutor) ExecuteConversationStream ¶
func (ce *DefaultConversationExecutor) ExecuteConversationStream(ctx context.Context, req ConversationRequest) (<-chan ConversationStreamChunk, error)
ExecuteConversationStream runs a complete conversation with streaming
type DistributedExecutor ¶
type DistributedExecutor interface {
// Execute runs work units across distributed workers
Execute(ctx context.Context, units []WorkUnit) error
// Status returns current execution status
Status() ExecutionStatus
// Workers returns information about available workers
Workers() []WorkerInfo
// Scale adjusts the number of workers
Scale(ctx context.Context, workerCount int) error
}
DistributedExecutor coordinates distributed execution of work units Planned for v0.2.0 - Distributed Execution Support
type DuplexConversationExecutor ¶ added in v1.1.6
type DuplexConversationExecutor struct {
// contains filtered or unexported fields
}
DuplexConversationExecutor handles duplex (bidirectional streaming) conversations. Unlike the standard executor which processes turns sequentially, this executor establishes a persistent streaming session and handles real-time audio I/O.
func NewDuplexConversationExecutor ¶ added in v1.1.6
func NewDuplexConversationExecutor( selfPlayRegistry *selfplay.Registry, promptRegistry *prompt.Registry, toolRegistry *tools.Registry, mediaStorage storage.MediaStorageService, packEvalHook *PackEvalHook, ) *DuplexConversationExecutor
NewDuplexConversationExecutor creates a new duplex conversation executor.
func (*DuplexConversationExecutor) ExecuteConversation ¶ added in v1.1.6
func (de *DuplexConversationExecutor) ExecuteConversation( ctx context.Context, req ConversationRequest, ) *ConversationResult
ExecuteConversation runs a duplex conversation based on scenario. For duplex mode, this establishes a streaming session and processes audio turns in real-time.
func (*DuplexConversationExecutor) ExecuteConversationStream ¶ added in v1.1.6
func (de *DuplexConversationExecutor) ExecuteConversationStream( ctx context.Context, req ConversationRequest, ) (<-chan ConversationStreamChunk, error)
ExecuteConversationStream runs a duplex conversation with streaming output. For duplex mode, this returns chunks as they arrive from the provider.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine manages the execution of prompt testing scenarios across multiple providers, regions, and configurations. It coordinates conversation execution, tool calling, validation, and result collection.
The engine supports both scripted conversations and self-play mode where an LLM simulates user behavior. It handles provider initialization, concurrent execution, and comprehensive result tracking including costs and tool usage.
func NewEngine ¶
func NewEngine( cfg *config.Config, providerRegistry *providers.Registry, promptRegistry *prompt.Registry, mcpRegistry *mcp.RegistryImpl, convExecutor ConversationExecutor, adapterRegistry *adapters.Registry, ) (*Engine, error)
NewEngine creates a new simulation engine from pre-built components. This is the primary constructor for the Engine and is preferred for testing where components can be created and configured independently.
This constructor uses dependency injection, accepting all required registries and executors as parameters. This makes testing easier and follows better architectural practices.
Parameters:
- cfg: Fully loaded and validated Config object
- providerRegistry: Registry for looking up providers by ID
- promptRegistry: Registry for system prompts and task types
- convExecutor: Executor for full conversations
- adapterRegistry: Registry for recording adapters (used for eval enumeration)
Returns an initialized Engine ready for test execution.
func NewEngineFromConfig ¶ added in v1.1.11
NewEngineFromConfig creates a new Engine from a pre-loaded configuration. This allows CLI or programmatic callers to modify the config before engine creation.
func NewEngineFromConfigFile ¶
NewEngineFromConfigFile creates a new simulation engine from a configuration file. It loads the configuration, validates it, initializes all registries, and sets up the execution pipeline for conversation testing.
The configuration file is loaded along with all referenced resources (scenarios, providers, tools, personas), making the Config object fully self-contained.
This constructor performs all necessary initialization steps in the correct order: 1. Load and validate configuration from file (including all referenced resources) 2. Build registries from loaded resources 3. Initialize executors (turn, conversation, self-play if enabled) 4. Create Engine with all components
Note: Logger verbosity should be configured at application startup, not here. This function does not modify global logger settings.
Parameters:
- configPath: Path to the arena.yaml configuration file
Returns an initialized Engine ready for test execution, or an error if:
- Configuration file cannot be read or parsed
- Configuration validation fails
- Any resource file cannot be loaded
- Provider type is unsupported
func (*Engine) Close ¶
Close shuts down the engine and cleans up resources. This includes closing all MCP server connections, provider HTTP clients, and the event store if session recording is enabled.
func (*Engine) ConfigureSessionRecordingFromConfig ¶ added in v1.1.6
ConfigureSessionRecordingFromConfig enables session recording if configured. It reads the recording configuration from the engine's config and enables session recording with the appropriate directory path. Returns nil if recording is not enabled in the config.
func (*Engine) EnableMockProviderMode ¶ added in v1.1.0
EnableMockProviderMode replaces all providers in the registry with mock providers. This enables testing of scenario behavior without making real API calls. Mock providers can use either file-based configuration for scenario-specific responses or default in-memory responses.
Parameters:
- mockConfigPath: Optional path to YAML configuration file for mock responses
Returns an error if the mock configuration file cannot be loaded or parsed.
func (*Engine) EnableSessionRecording ¶ added in v1.1.6
EnableSessionRecording enables session recording for all runs. Recordings are stored in the specified directory as JSONL files, one file per session (using RunID as session ID). Returns an error if the directory cannot be created.
func (*Engine) ExecuteRuns ¶
GetStateStore returns the engine's state store for accessing run results Runs are executed concurrently up to the specified concurrency limit, with run IDs collected in order matching the input plan.
Each run executes independently: - Loads scenario and provider - Executes conversation turns (with self-play if configured) - Runs validators on the results - Tracks costs, timing, and tool calls - Saves results to StateStore
The context can be used to cancel all in-flight executions. Run IDs are returned for all combinations, with errors captured in individual RunResult (accessible via StateStore).
Parameters:
- ctx: Context for cancellation
- plan: RunPlan containing combinations to execute
- concurrency: Maximum number of simultaneous executions
Returns a slice of RunIDs in the same order as plan.Combinations, or an error if execution setup fails. Individual run errors are stored in StateStore, not returned here.
func (*Engine) GenerateRunPlan ¶
func (e *Engine) GenerateRunPlan(regionFilter, providerFilter, scenarioFilter, evalFilter []string) (*RunPlan, error)
GenerateRunPlan creates a comprehensive test execution plan from filter criteria. The plan contains all combinations of regions × providers × scenarios OR evals that match the provided filters. Scenarios and evals are mutually exclusive.
For scenarios: - regionFilter: Empty = all regions from prompt configs (or default) - providerFilter: Empty = all registered providers (or scenario-specified providers) - scenarioFilter: Empty = all loaded scenarios
For evals: - evalFilter: Empty = all loaded evals - Regions and providers are not used (they come from recordings)
Provider selection logic (scenarios only): 1. If scenario specifies providers: use those (intersected with CLI filter if provided) 2. If scenario doesn't specify providers: use all arena providers (intersected with CLI filter)
Returns a RunPlan containing all matching combinations, ready for execution. Each combination represents one independent test run that will be executed and validated separately.
func (*Engine) GetRecordingDir ¶ added in v1.1.6
GetRecordingDir returns the directory where session recordings are stored. Returns empty string if recording is not enabled.
func (*Engine) GetRecordingPath ¶ added in v1.1.6
GetRecordingPath returns the path to the recording file for a given run ID. Returns empty string if recording is not enabled.
func (*Engine) GetStateStore ¶
func (e *Engine) GetStateStore() statestore.Store
GetStateStore returns the engine's state store for accessing run results
func (*Engine) SetEventBus ¶ added in v1.1.4
SetEventBus configures the shared event bus used for runtime and TUI observability. If session recording is enabled, the event bus will be wired to the event store.
type EvalConversationExecutor ¶ added in v1.1.9
type EvalConversationExecutor struct {
// contains filtered or unexported fields
}
EvalConversationExecutor handles evaluation mode: replaying saved conversations with assertions. Unlike scenario execution, eval mode: - Loads turns from recordings (no prompt building) - Applies assertions to pre-recorded assistant messages - Skips tool execution (tool calls are metadata only) - Returns results in the same schema as scenario execution for output parity
func NewEvalConversationExecutor ¶ added in v1.1.9
func NewEvalConversationExecutor( adapterRegistry *adapters.Registry, assertionRegistry *runtimeValidators.Registry, convAssertionReg *assertions.ConversationAssertionRegistry, promptRegistry *prompt.Registry, providerRegistry *providers.Registry, packEvalHook *PackEvalHook, ) *EvalConversationExecutor
NewEvalConversationExecutor creates a new eval conversation executor.
func (*EvalConversationExecutor) ExecuteConversation ¶ added in v1.1.9
func (e *EvalConversationExecutor) ExecuteConversation( ctx context.Context, req ConversationRequest, ) *ConversationResult
ExecuteConversation runs an evaluation on a saved conversation.
func (*EvalConversationExecutor) ExecuteConversationStream ¶ added in v1.1.9
func (e *EvalConversationExecutor) ExecuteConversationStream( ctx context.Context, req ConversationRequest, ) (<-chan ConversationStreamChunk, error)
ExecuteConversationStream runs evaluation with streaming output. For eval mode, we don't have true streaming since we're replaying, but we implement this to satisfy the interface.
type ExecutionStatus ¶
type ExecutionStatus struct {
Total int `json:"total"` // Total work units
Completed int `json:"completed"` // Completed work units
Running int `json:"running"` // Currently executing
Failed int `json:"failed"` // Failed work units
StartTime time.Time `json:"start_time"` // Execution start time
Duration time.Duration `json:"duration"` // Total duration so far
Workers int `json:"workers"` // Active worker count
}
ExecutionStatus represents the state of distributed execution
type MediaOutput ¶ added in v1.1.0
type MediaOutput struct {
Type string `json:"type"` // "image", "audio", "video"
MIMEType string `json:"mime_type"` // e.g., "image/jpeg", "audio/mp3"
SizeBytes int64 `json:"size_bytes"` // Size of the media file
Duration *int `json:"duration,omitempty"` // Duration in seconds (for audio/video)
Width *int `json:"width,omitempty"` // Width in pixels (for image/video)
Height *int `json:"height,omitempty"` // Height in pixels (for image/video)
FilePath string `json:"file_path,omitempty"` // Path where media was saved
Thumbnail string `json:"thumbnail,omitempty"` // Base64-encoded thumbnail for HTML reports
MessageIdx int `json:"message_index"` // Index of message containing this media
PartIdx int `json:"part_index"` // Index of content part within the message
}
MediaOutput represents media content generated by an LLM during test execution
func CollectMediaOutputs ¶ added in v1.1.0
func CollectMediaOutputs(messages []types.Message) []MediaOutput
CollectMediaOutputs extracts media outputs from conversation messages Returns a slice of MediaOutput for tracking in RunResult
type MediaOutputStats ¶ added in v1.1.0
type MediaOutputStats struct {
Total int `json:"total"`
ImageCount int `json:"image_count"`
AudioCount int `json:"audio_count"`
VideoCount int `json:"video_count"`
TotalSizeBytes int64 `json:"total_size_bytes"`
ByType map[string]int `json:"by_type"`
}
MediaOutputStats contains summary statistics for media outputs
func GetMediaOutputStatistics ¶ added in v1.1.0
func GetMediaOutputStatistics(outputs []MediaOutput) MediaOutputStats
GetMediaOutputStatistics calculates summary statistics for media outputs
type PackEvalAdapter ¶ added in v1.1.11
type PackEvalAdapter struct{}
PackEvalAdapter converts evals.EvalResult to assertions.ConversationValidationResult so pack eval results flow through the existing Arena assertion reporting pipeline.
func (*PackEvalAdapter) Convert ¶ added in v1.1.11
func (a *PackEvalAdapter) Convert(results []evals.EvalResult) []assertions.ConversationValidationResult
Convert transforms a slice of EvalResult into ConversationValidationResult entries. Each result is tagged with the PackEvalTypePrefix so renderers can group them separately.
type PackEvalHook ¶ added in v1.1.11
type PackEvalHook struct {
// contains filtered or unexported fields
}
PackEvalHook manages pack eval execution during Arena conversation runs. It wraps an EvalDispatcher and converts results into the assertion format used by Arena's reporting pipeline.
func NewPackEvalHook ¶ added in v1.1.11
func NewPackEvalHook( registry *evals.EvalTypeRegistry, defs []evals.EvalDef, skipEvals bool, evalTypeFilter []string, taskType string, ) *PackEvalHook
NewPackEvalHook creates a hook for executing pack evals during Arena runs. If skipEvals is true, a NoOpDispatcher is used internally. The evalTypeFilter, when non-empty, restricts execution to matching eval types.
func (*PackEvalHook) HasEvals ¶ added in v1.1.11
func (h *PackEvalHook) HasEvals() bool
HasEvals returns true if there are eval defs to execute.
func (*PackEvalHook) RunSessionEvals ¶ added in v1.1.11
func (h *PackEvalHook) RunSessionEvals( ctx context.Context, messages []types.Message, sessionID string, ) []assertions.ConversationValidationResult
RunSessionEvals runs session-complete evals after conversation finishes. Returns converted ConversationValidationResult entries.
func (*PackEvalHook) RunTurnEvals ¶ added in v1.1.11
func (h *PackEvalHook) RunTurnEvals( ctx context.Context, messages []types.Message, turnIndex int, sessionID string, ) []assertions.ConversationValidationResult
RunTurnEvals runs turn-triggered evals after a turn completes. Returns converted ConversationValidationResult entries.
type ResultSink ¶
type ResultSink interface {
// Store saves a work result
Store(ctx context.Context, result WorkResult) error
// Retrieve gets a work result by work unit ID
Retrieve(ctx context.Context, workUnitID string) (WorkResult, error)
// Query searches for results matching criteria
Query(ctx context.Context, criteria map[string]interface{}) ([]WorkResult, error)
// Subscribe to real-time result updates
Subscribe(ctx context.Context, filter func(WorkResult) bool) (<-chan WorkResult, error)
}
ResultSink handles storage and processing of work results Planned for v0.2.0 - Distributed Execution Support
type RunCombination ¶
type RunCombination struct {
Region string
ScenarioID string // For scenario-based runs
EvalID string // For eval-based runs (mutually exclusive with ScenarioID)
ProviderID string // Not used for eval runs (provider comes from recording)
RecordingRef string // For batch evals: specific recording reference ID (resolved by adapter)
}
RunCombination represents a single test execution
type RunPlan ¶
type RunPlan struct {
Combinations []RunCombination
}
RunPlan defines the test execution plan
type RunResult ¶
type RunResult struct {
RunID string `json:"RunID"`
PromptPack string `json:"PromptPack"`
Region string `json:"Region"`
ScenarioID string `json:"ScenarioID"`
ProviderID string `json:"ProviderID"`
Params map[string]interface{} `json:"Params"`
Messages []types.Message `json:"Messages"`
Commit map[string]interface{} `json:"Commit"`
Cost types.CostInfo `json:"Cost"`
ToolStats *types.ToolStats `json:"ToolStats"`
Violations []types.ValidationError `json:"Violations"`
StartTime time.Time `json:"StartTime"`
EndTime time.Time `json:"EndTime"`
Duration time.Duration `json:"Duration"`
Error string `json:"Error"`
SelfPlay bool `json:"SelfPlay"`
PersonaID string `json:"PersonaID"`
UserFeedback *statestore.Feedback `json:"UserFeedback"`
SessionTags []string `json:"SessionTags"`
AssistantRole *SelfPlayRoleInfo `json:"AssistantRole"`
UserRole *SelfPlayRoleInfo `json:"UserRole"`
// Media outputs generated by LLMs during test execution
MediaOutputs []MediaOutput `json:"MediaOutputs,omitempty"`
// Session recording path (if recording was enabled)
RecordingPath string `json:"RecordingPath,omitempty"`
// Conversation-level assertions evaluated after the conversation completes (summary format)
ConversationAssertions AssertionsSummary `json:"conversation_assertions,omitempty"`
}
RunResult contains the complete results of a single test execution
type SelfPlayRoleInfo ¶
SelfPlayRoleInfo contains provider information for self-play roles
type StateStoreConfig ¶
type StateStoreConfig struct {
Store interface{} // State store implementation (statestore.Store)
UserID string // User identifier (optional)
Metadata map[string]interface{} // Additional metadata to store (optional)
}
StateStoreConfig wraps the pipeline StateStore configuration for Arena
type WorkResult ¶
type WorkResult interface {
// WorkUnitID returns the ID of the work unit that produced this result
WorkUnitID() string
// Success indicates if the work unit completed successfully
Success() bool
// Data returns the result data (conversation, analysis, etc.)
Data() interface{}
// Error returns any error that occurred during execution
Error() error
// Metadata returns additional metadata about the execution
Metadata() map[string]interface{}
}
WorkResult represents the result of executing a WorkUnit
type WorkSource ¶
type WorkSource interface {
// Generate creates work units from configuration
Generate(ctx context.Context, config interface{}) ([]WorkUnit, error)
// Stream provides work units as they become available
Stream(ctx context.Context, config interface{}) (<-chan WorkUnit, error)
// EstimateWorkload returns expected number of work units
EstimateWorkload(ctx context.Context, config interface{}) (int, error)
}
WorkSource generates work units for execution Planned for v0.2.0 - Distributed Execution Support
type WorkUnit ¶
type WorkUnit interface {
// ID returns unique identifier for this work unit
ID() string
// Execute performs the work unit and returns results
Execute(ctx context.Context) (WorkResult, error)
// Dependencies returns work units that must complete before this one
Dependencies() []string
// Priority returns execution priority (higher number = higher priority)
Priority() int
// EstimatedDuration returns expected execution time
EstimatedDuration() time.Duration
}
WorkUnit represents a single unit of work in distributed execution Planned for v0.2.0 - Distributed Execution Support
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"` // Worker identifier
Status string `json:"status"` // online, offline, busy
Capacity int `json:"capacity"` // Max concurrent work units
Current int `json:"current"` // Current work units
LastSeen time.Time `json:"last_seen"` // Last heartbeat
Metadata map[string]interface{} `json:"metadata"` // Additional worker info
}
WorkerInfo contains information about a distributed worker
Source Files
¶
- builder_integration.go
- composite_conversation_executor.go
- conversation_executor.go
- distributed.go
- duplex_conversation_executor.go
- duplex_executor_assertions_integration.go
- duplex_executor_events_integration.go
- duplex_executor_pipeline_integration.go
- duplex_executor_results_integration.go
- duplex_executor_tools_integration.go
- duplex_executor_turns_integration.go
- duplex_executor_types.go
- engine.go
- eval_conversation_executor.go
- execution.go
- interfaces.go
- media_collector.go
- pack_eval_adapter.go
- pack_eval_hook.go
- test_helpers.go
- test_init.go
- types.go