Documentation
¶
Overview ¶
Package turnexecutors provides execution strategies for different conversation patterns.
This package implements various turn execution modes:
- Scripted: Pre-defined user messages from scenario configuration
- Self-play: AI-generated user messages for autonomous testing
- Pipeline: Direct LLM pipeline execution for single-turn scenarios
Each executor handles the specifics of message generation, provider interaction, validation, and result collection for its execution mode.
Key interfaces:
- Executor: Base interface for all turn execution strategies
- TurnConfig: Configuration for individual turn execution
Implementations:
- ScriptedExecutor: Executes pre-defined conversation scripts
- SelfPlayExecutor: Generates dynamic user messages via persona LLM
- PipelineExecutor: Direct pipeline execution for simple scenarios
Index ¶
- Constants
- func ConvertTurnPartsToMessageParts(ctx context.Context, turnParts []config.TurnContentPart, baseDir string, ...) ([]types.ContentPart, error)
- func IsMediaLoadError(err error) bool
- func IsSupportedAudioMIMEType(mimeType string) bool
- func IsSupportedImageMIMEType(mimeType string) bool
- func IsSupportedVideoMIMEType(mimeType string) bool
- func ValidateFilePath(filePath, baseDir string) error
- func ValidateFileSize(filePath string, maxSize int64) error
- func ValidateMIMEType(mimeType, contentType string) error
- func ValidateTurnMediaContent(media *config.TurnMediaContent, contentType string) error
- type AudioFileSource
- func (s *AudioFileSource) BitDepth() int
- func (s *AudioFileSource) Channels() int
- func (s *AudioFileSource) Close() error
- func (s *AudioFileSource) Duration() float64
- func (s *AudioFileSource) Format() AudioFormat
- func (s *AudioFileSource) ReadChunk(size int) ([]byte, error)
- func (s *AudioFileSource) Reset() error
- func (s *AudioFileSource) SampleRate() int
- type AudioFormat
- type HTTPMediaLoader
- type MediaErrorType
- type MediaLoadError
- func NewFileError(index int, contentType, source, message string, cause error) *MediaLoadError
- func NewFormatError(index int, contentType, source, message string) *MediaLoadError
- func NewNetworkError(index int, contentType, source, message string, cause error) *MediaLoadError
- func NewSecurityError(index int, contentType, source, message string) *MediaLoadError
- func NewSizeError(index int, contentType, source, message string) *MediaLoadError
- func NewValidationError(index int, contentType, source, message string) *MediaLoadError
- type MessageStreamChunk
- type PipelineExecutor
- type ScriptedExecutor
- type SelfPlayExecutor
- type StateStoreConfig
- type TurnExecutor
- type TurnRequest
Constants ¶
const ( // DefaultMaxFileSize is the default maximum file size (50MB) DefaultMaxFileSize = 50 * 1024 * 1024 // MaxPathLength is the maximum allowed file path length MaxPathLength = 4096 )
Variables ¶
This section is empty.
Functions ¶
func ConvertTurnPartsToMessageParts ¶ added in v1.1.0
func ConvertTurnPartsToMessageParts( ctx context.Context, turnParts []config.TurnContentPart, baseDir string, httpLoader *HTTPMediaLoader, storageService storage.MediaStorageService, ) ([]types.ContentPart, error)
ConvertTurnPartsToMessageParts converts scenario turn parts to runtime message parts, loading media files from disk, URLs, or storage references as needed. The storageService parameter is optional and only needed when loading from storage references.
func IsMediaLoadError ¶ added in v1.1.0
IsMediaLoadError checks if an error is a MediaLoadError
func IsSupportedAudioMIMEType ¶ added in v1.1.0
IsSupportedAudioMIMEType checks if a MIME type is a supported audio format
func IsSupportedImageMIMEType ¶ added in v1.1.0
IsSupportedImageMIMEType checks if a MIME type is a supported image format
func IsSupportedVideoMIMEType ¶ added in v1.1.0
IsSupportedVideoMIMEType checks if a MIME type is a supported video format
func ValidateFilePath ¶ added in v1.1.0
ValidateFilePath validates a file path for security issues
func ValidateFileSize ¶ added in v1.1.0
ValidateFileSize checks if a file size is within limits
func ValidateMIMEType ¶ added in v1.1.0
ValidateMIMEType validates that a MIME type matches the expected content type
func ValidateTurnMediaContent ¶ added in v1.1.0
func ValidateTurnMediaContent(media *config.TurnMediaContent, contentType string) error
ValidateTurnMediaContent validates media content configuration for security and correctness
Types ¶
type AudioFileSource ¶ added in v1.1.6
type AudioFileSource struct {
// contains filtered or unexported fields
}
AudioFileSource provides streaming audio from a file for duplex mode. It reads audio files and provides chunks of raw PCM audio data suitable for streaming to providers like Gemini Live API.
func NewAudioFileSource ¶ added in v1.1.6
func NewAudioFileSource(filePath, baseDir string) (*AudioFileSource, error)
NewAudioFileSource creates a new audio file source for streaming. It supports WAV files and will automatically parse headers to determine format. The filePath can be absolute or relative to baseDir.
func (*AudioFileSource) BitDepth ¶ added in v1.1.6
func (s *AudioFileSource) BitDepth() int
BitDepth returns the original bit depth of the audio.
func (*AudioFileSource) Channels ¶ added in v1.1.6
func (s *AudioFileSource) Channels() int
Channels returns the number of audio channels.
func (*AudioFileSource) Close ¶ added in v1.1.6
func (s *AudioFileSource) Close() error
Close closes the audio file.
func (*AudioFileSource) Duration ¶ added in v1.1.6
func (s *AudioFileSource) Duration() float64
Duration returns the estimated duration of the audio in seconds.
func (*AudioFileSource) Format ¶ added in v1.1.6
func (s *AudioFileSource) Format() AudioFormat
Format returns the audio format.
func (*AudioFileSource) ReadChunk ¶ added in v1.1.6
func (s *AudioFileSource) ReadChunk(size int) ([]byte, error)
ReadChunk reads up to size bytes of audio data. Returns the audio data as raw bytes suitable for streaming. Returns io.EOF when all data has been read.
func (*AudioFileSource) Reset ¶ added in v1.1.6
func (s *AudioFileSource) Reset() error
Reset seeks back to the beginning of the audio data.
func (*AudioFileSource) SampleRate ¶ added in v1.1.6
func (s *AudioFileSource) SampleRate() int
SampleRate returns the sample rate of the audio.
type AudioFormat ¶ added in v1.1.6
type AudioFormat int
AudioFormat represents the audio encoding format.
const ( // AudioFormatPCM16 is 16-bit signed PCM (little-endian). AudioFormatPCM16 AudioFormat = iota // AudioFormatPCM24 is 24-bit signed PCM (little-endian). AudioFormatPCM24 // AudioFormatPCM32 is 32-bit signed PCM (little-endian). AudioFormatPCM32 // AudioFormatFloat32 is 32-bit float PCM. AudioFormatFloat32 )
type HTTPMediaLoader ¶ added in v1.1.0
type HTTPMediaLoader struct {
// contains filtered or unexported fields
}
HTTPMediaLoader handles loading media from HTTP/HTTPS URLs
func NewHTTPMediaLoader ¶ added in v1.1.0
func NewHTTPMediaLoader(timeout time.Duration, maxFileSize int64) *HTTPMediaLoader
NewHTTPMediaLoader creates a new HTTP media loader with the specified timeout and max file size
type MediaErrorType ¶ added in v1.1.0
type MediaErrorType string
MediaErrorType categorizes media loading errors
const ( // MediaErrorTypeValidation indicates invalid media configuration MediaErrorTypeValidation MediaErrorType = "validation" // MediaErrorTypeNetwork indicates network/HTTP-related errors MediaErrorTypeNetwork MediaErrorType = "network" // MediaErrorTypeFile indicates file system errors MediaErrorTypeFile MediaErrorType = "file" // MediaErrorTypeSecurity indicates security-related errors MediaErrorTypeSecurity MediaErrorType = "security" // MediaErrorTypeSize indicates file size limit errors MediaErrorTypeSize MediaErrorType = "size" // MediaErrorTypeFormat indicates unsupported format errors MediaErrorTypeFormat MediaErrorType = "format" )
type MediaLoadError ¶ added in v1.1.0
type MediaLoadError struct {
Type MediaErrorType // Error category
Index int // Content part index where error occurred
Source string // Media source (file path or URL)
ContentType string // Content type (image, audio, video)
Message string // Human-readable error message
Cause error // Underlying error (if any)
}
MediaLoadError represents a structured error for media loading operations
func NewFileError ¶ added in v1.1.0
func NewFileError(index int, contentType, source, message string, cause error) *MediaLoadError
NewFileError creates a file system error
func NewFormatError ¶ added in v1.1.0
func NewFormatError(index int, contentType, source, message string) *MediaLoadError
NewFormatError creates a format error
func NewNetworkError ¶ added in v1.1.0
func NewNetworkError(index int, contentType, source, message string, cause error) *MediaLoadError
NewNetworkError creates a network error
func NewSecurityError ¶ added in v1.1.0
func NewSecurityError(index int, contentType, source, message string) *MediaLoadError
NewSecurityError creates a security error
func NewSizeError ¶ added in v1.1.0
func NewSizeError(index int, contentType, source, message string) *MediaLoadError
NewSizeError creates a file size error
func NewValidationError ¶ added in v1.1.0
func NewValidationError(index int, contentType, source, message string) *MediaLoadError
NewValidationError creates a validation error
func (*MediaLoadError) Error ¶ added in v1.1.0
func (e *MediaLoadError) Error() string
Error implements the error interface
func (*MediaLoadError) Unwrap ¶ added in v1.1.0
func (e *MediaLoadError) Unwrap() error
Unwrap returns the underlying error
type MessageStreamChunk ¶
type MessageStreamChunk struct {
// All messages accumulated so far (user + assistant + tool results)
Messages []types.Message
// Content delta from this specific chunk
Delta string
// Which message is being updated (index in Messages array)
MessageIndex int
// Token count (accumulated)
TokenCount int
// Finish reason (only in final chunk)
FinishReason *string
// Error if streaming failed
Error error
// Metadata
Metadata map[string]interface{}
}
MessageStreamChunk represents a streaming chunk during turn execution
type PipelineExecutor ¶
type PipelineExecutor struct {
// contains filtered or unexported fields
}
PipelineExecutor executes conversations through the pipeline architecture. It handles both non-streaming and streaming execution, including multi-round tool calls.
func NewPipelineExecutor ¶
func NewPipelineExecutor(toolRegistry *tools.Registry, mediaStorage storage.MediaStorageService) *PipelineExecutor
NewPipelineExecutor creates a new pipeline executor with the specified tool registry and media storage. The mediaStorage parameter enables automatic externalization of large media content to file storage.
func (*PipelineExecutor) Execute ¶
func (e *PipelineExecutor) Execute( ctx context.Context, req *TurnRequest, userMessage *types.Message, ) error
Execute runs the conversation through the pipeline and returns the new messages generated. This is the new flattened API that works directly with message lists.
Input: history (existing conversation) + userMessage (new user input) Output: all new messages generated (assistant messages, tool results, etc.)
type ScriptedExecutor ¶
type ScriptedExecutor struct {
// contains filtered or unexported fields
}
ScriptedExecutor executes turns where the user message is scripted (predefined)
func NewScriptedExecutor ¶
func NewScriptedExecutor(pipelineExecutor *PipelineExecutor) *ScriptedExecutor
NewScriptedExecutor creates a new executor for scripted user turns
func (*ScriptedExecutor) ExecuteTurn ¶
func (e *ScriptedExecutor) ExecuteTurn(ctx context.Context, req TurnRequest) error
ExecuteTurn executes a scripted turn (user message from scenario + AI response)
func (*ScriptedExecutor) ExecuteTurnStream ¶
func (e *ScriptedExecutor) ExecuteTurnStream( ctx context.Context, req TurnRequest, ) (<-chan MessageStreamChunk, error)
ExecuteTurnStream executes a scripted turn with streaming
type SelfPlayExecutor ¶
type SelfPlayExecutor struct {
// contains filtered or unexported fields
}
SelfPlayExecutor executes turns where the user message is generated by an LLM
func NewSelfPlayExecutor ¶
func NewSelfPlayExecutor(pipelineExecutor *PipelineExecutor, contentProvider selfplay.Provider) *SelfPlayExecutor
NewSelfPlayExecutor creates a new executor for self-play turns
func (*SelfPlayExecutor) ExecuteTurn ¶
func (e *SelfPlayExecutor) ExecuteTurn(ctx context.Context, req TurnRequest) error
ExecuteTurn executes a self-play turn (LLM-generated user message + AI response)
func (*SelfPlayExecutor) ExecuteTurnStream ¶
func (e *SelfPlayExecutor) ExecuteTurnStream( ctx context.Context, req TurnRequest, ) (<-chan MessageStreamChunk, error)
ExecuteTurnStream executes a self-play turn with streaming
type StateStoreConfig ¶
type StateStoreConfig struct {
Store interface{} // State store implementation (statestore.Store)
UserID string // User identifier (optional)
Metadata map[string]interface{} // Additional metadata to store (optional)
}
StateStoreConfig wraps the state store configuration for turn executors
type TurnExecutor ¶
type TurnExecutor interface {
ExecuteTurn(ctx context.Context, req TurnRequest) error
ExecuteTurnStream(ctx context.Context, req TurnRequest) (<-chan MessageStreamChunk, error)
}
TurnExecutor executes one complete conversation turn (user message + AI response + tools) Messages are persisted to StateStore, not returned
type TurnRequest ¶
type TurnRequest struct {
// Provider and configuration
Provider providers.Provider
Scenario *config.Scenario
Temperature float64
MaxTokens int
Seed *int
Metadata map[string]interface{} // Optional execution metadata (e.g., judge targets/defaults)
// Prompt assembly (moved to pipeline)
PromptRegistry *prompt.Registry
TaskType string
Region string
PromptVars map[string]string // Variable overrides from arena.yaml prompt_configs[].vars
// Base directory for resolving relative file paths in media content
BaseDir string
// State management (StateStore handles all history)
StateStoreConfig *StateStoreConfig // State store configuration
ConversationID string // Conversation identifier for state persistence
// Executor-specific fields (only one should be set depending on executor type)
ScriptedContent string // ScriptedExecutor: text-only user message (legacy)
ScriptedParts []config.TurnContentPart // ScriptedExecutor: multimodal parts (precedence)
SelfPlayRole string // SelfPlayExecutor: the self-play role
SelfPlayPersona string // SelfPlayExecutor: the persona to use
// Assertions to validate after turn execution
Assertions []assertions.AssertionConfig
// Observability
EventBus *events.EventBus
RunID string
}
TurnRequest contains all information needed to execute a turn