stage

package
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 29 Imported by: 0

README

Pipeline Stage Architecture

This package implements Phase 1 of the Pipeline Streaming Architecture Proposal: the foundation for reactive streams-based pipeline execution.

Overview

The stage architecture introduces a new way to build pipelines using a Directed Acyclic Graph (DAG) of stages instead of a linear middleware chain. This enables:

  • True streaming execution: Stages process elements as they arrive, not after accumulation
  • Explicit concurrency: Each stage runs in its own goroutine with clear lifecycle
  • Backpressure support: Channel-based communication naturally handles slow consumers
  • Flexible topologies: Support for branching, fan-in, fan-out patterns
  • Backward compatibility: Existing middleware can be wrapped as stages

Core Concepts

StreamElement

The fundamental unit of data flowing through the pipeline. Each element can carry:

  • Content: Text, Audio, Video, Image, Message, ToolCall, or generic Parts
  • Metadata: Key-value pairs for passing state between stages
  • Priority: For QoS-aware scheduling (Low, Normal, High, Critical)
  • Control signals: Error propagation, end-of-stream markers
// Create different types of elements
textElem := stage.NewTextElement("Hello")
msgElem := stage.NewMessageElement(types.Message{Role: "user", Content: "Hello"})
audioElem := stage.NewAudioElement(&stage.AudioData{
    Samples: audioBytes,
    SampleRate: 16000,
    Format: stage.AudioFormatPCM16,
})
errorElem := stage.NewErrorElement(err)
Stage

A processing unit that transforms streaming elements. Unlike middleware, stages:

  • Declare their I/O characteristics via StageType
  • Operate on channels, not shared context
  • Run concurrently in separate goroutines
  • Must close their output channel when done
type Stage interface {
    Name() string
    Type() StageType
    Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stage Types:

  • Transform: 1:1 or 1:N transformation (validation, enrichment)
  • Accumulate: N:1 accumulation (VAD buffering, message collection)
  • Generate: 0:N generation (LLM streaming, TTS)
  • Sink: N:0 terminal stage (state store save, metrics)
  • Bidirectional: Full duplex (WebSocket session)
PipelineBuilder

Constructs the pipeline DAG using a fluent API:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage1,
        stage2,
        stage3,
    ).
    Branch("stage2", "stage4", "stage5").  // Fan out from stage2
    Build()
StreamPipeline

The executable pipeline that:

  • Manages goroutine lifecycle
  • Creates channels between stages
  • Handles errors and shutdown
  • Emits events for observability
// Streaming execution
output, err := pipeline.Execute(ctx, input)
for elem := range output {
    // Process elements as they arrive
}

// Synchronous execution (convenience wrapper)
result, err := pipeline.ExecuteSync(ctx, elements...)

Implementation Status

✅ Completed (Phase 1: Foundation)
  1. Core Types

    • StreamElement with full content type support (Text, Audio, Video, Image, Message, ToolCall)
    • Priority enum for QoS
    • Helper functions for creating elements
  2. Stage Interface

    • Stage interface with Name, Type, Process methods
    • StageType enum (Transform, Accumulate, Generate, Sink, Bidirectional)
    • BaseStage for reducing boilerplate
    • Utility stages: PassthroughStage, FilterStage, MapStage
    • StageFunc for functional-style stages
  3. PipelineBuilder

    • Fluent API with Chain(), Connect(), Branch()
    • DAG validation (cycle detection, missing stages, duplicate names)
    • Configuration support
  4. StreamPipeline

    • Concurrent stage execution
    • Channel-based communication
    • Error propagation
    • Graceful shutdown with timeout
    • Execute() for streaming, ExecuteSync() for request/response
  5. Configuration

    • PipelineConfig with sensible defaults
    • Channel buffer size (default: 16)
    • Priority queue support (optional)
    • Execution timeouts
    • Metrics and tracing flags
  6. Events & Observability

    • New event types: stage.started, stage.completed, stage.failed
    • Event data includes stage name, type, duration, errors
    • Compatible with existing EventEmitter and EventBus
  7. Error Handling

    • StageError wraps errors with stage context
    • Error elements propagate through pipeline
    • First error captured and returned
✅ Completed (Phase 2: Core Request/Response Stages)
  1. PromptAssemblyStage (stages_core.go)

    • Loads prompts from registry with variable substitution
    • Extracts validators from prompt configuration
    • Emits system prompt and allowed tools via metadata
  2. ValidationStage (stages_core.go)

    • Dynamic validator execution from metadata
    • Supports multiple validators in sequence
    • Emits validation results with pass/fail status
  3. StateStoreLoadStage (stages_core.go)

    • Loads conversation history from state store
    • Marks historical messages with from_history flag
    • Prepends history to current messages
  4. StateStoreSaveStage (stages_core.go)

    • Saves conversation state after processing
    • Merges metadata from messages
    • Updates state store with new messages
  5. ProviderStage (stages_provider.go)

    • Request/response LLM execution
    • Multi-round tool call support
    • Full tool execution via toolRegistry.ExecuteAsync()
    • Policy enforcement (blocklist, ToolChoice)
    • Complete/Failed/Pending status handling
✅ Completed (Phase 3: Middleware Removal)
  1. Middleware Removed
    • Legacy Middleware interface has been removed
    • All pipeline execution now uses native stages
    • SDK and Arena fully migrated to stage-based pipelines
✅ Completed (Phase 4: Streaming Stages)
  1. VADAccumulatorStage (stages_streaming.go)

    • Buffers audio chunks until turn complete
    • VAD-based silence detection
    • Transcribes buffered audio to text
    • Configurable thresholds and durations
  2. TTSStage (stages_streaming.go)

    • Synthesizes audio for text elements
    • Adds AudioData to StreamElements
    • Configurable min text length and empty handling
    • Resilient error handling
  3. DuplexProviderStage (stages_streaming.go)

    • Bidirectional WebSocket streaming
    • Concurrent input/output forwarding
    • StreamElement ⟷ StreamChunk conversion
    • Handles audio and text elements
✅ Completed (Phase 5: Advanced Features)
  1. RouterStage (stages_advanced.go)

    • Dynamic routing with configurable routing functions
    • Multiple output channel support
    • Thread-safe output management
  2. MergeStage (stages_advanced.go)

    • Fan-in patterns (N:1 merging)
    • Concurrent processing of multiple inputs
    • Adds merge_input_index metadata
  3. StageMetrics + MetricsStage (stages_advanced.go)

    • Per-stage performance monitoring
    • Tracks latency (min/max/avg), throughput, errors
    • Thread-safe metrics collection
    • Transparent wrapper pattern
  4. PriorityChannel (stages_advanced.go)

    • Priority-based scheduling (Critical/High/Normal/Low)
    • Separate queues per priority level
    • Context-aware blocking operations
  5. TracingStage (stages_advanced.go)

    • Element-level distributed tracing
    • Trace ID generation and propagation
    • Per-stage timing information
    • GetTraceInfo() helper function
✅ Completed (Phase 6b: Utility Stages) - Dec 14, 2024

Package: runtime/pipeline/stage/ File: stages_utilities.go (612 LOC) Commit: 7509d76 - feat(pipeline): implement Phase 6b - 5 utility stages for complete middleware parity

Implemented Stages:

  • DebugStage - Pipeline debugging with JSON snapshot logging
  • TemplateStage - Variable substitution ({{variable}} → value)
  • VariableProviderStage - Dynamic variable resolution from providers
  • MediaExternalizerStage - External media storage for large content
  • ContextBuilderStage - Token budget management with truncation strategies

Features:

  • All 5 remaining middleware now have stage equivalents
  • Channel-based communication with proper cleanup
  • Context cancellation support
  • Metadata propagation throughout pipeline
  • Configurable behavior (thresholds, policies, strategies)

Status: Phase 6b complete. All middleware have stage equivalents. Lines Added: ~612 LOC in stages_utilities.go

📊 Implementation Statistics
  • Total LOC: 4,233 lines across 13 files
  • Stages Implemented: 20+ production stages (core, streaming, advanced, utilities)
  • Test Status: All existing pipeline tests pass
  • Backward Compatibility: ✅ Verified with 5 CI examples (50 test scenarios)
  • TODOs Remaining: 2 (embedding-based relevance, LLM summarization in ContextBuilderStage)
  • Commits: 8 commits across all phases
🎯 Future Enhancements (Optional)
  • Performance benchmarking suite
  • Element pooling for GC optimization
  • Grafana/Prometheus metrics exporters
  • Additional router strategies (weighted, round-robin)

Usage Examples

Simple Linear Pipeline
pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewPassthroughStage("input"),
        stage.NewPassthroughStage("process"),
        stage.NewPassthroughStage("output"),
    ).
    Build()

input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(types.Message{Role: "user", Content: "Hello"})
close(input)

output, _ := pipeline.Execute(ctx, input)
for elem := range output {
    // Process streaming output
}
Custom Stage
type UppercaseStage struct {
    stage.BaseStage
}

func NewUppercaseStage() *UppercaseStage {
    return &UppercaseStage{
        BaseStage: stage.NewBaseStage("uppercase", stage.StageTypeTransform),
    }
}

func (s *UppercaseStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error {
    defer close(output)

    for elem := range input {
        if elem.Text != nil {
            upper := strings.ToUpper(*elem.Text)
            elem.Text = &upper
        }

        select {
        case output <- elem:
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}
Branching Pipeline
pipeline := stage.NewPipelineBuilder().
    Chain(inputStage, processorStage).
    Branch("processor",
        "textOutput",    // Text elements go here
        "audioOutput",   // Audio elements go here
    ).
    Build()

Configuration

config := stage.DefaultPipelineConfig().
    WithChannelBufferSize(32).           // Larger buffers = more throughput
    WithPriorityQueue(true).             // Enable priority scheduling
    WithExecutionTimeout(60 * time.Second).
    WithMetrics(true).                   // Enable per-stage metrics
    WithTracing(true)                    // Enable element-level tracing

pipeline := stage.NewPipelineBuilderWithConfig(config).
    Chain(stages...).
    Build()

Testing

All core pipeline tests pass with the new architecture:

go test ./runtime/pipeline -v
# PASS
# ok  	github.com/AltairaLabs/PromptKit/runtime/pipeline	0.184s

Example tests demonstrate usage:

go test ./runtime/pipeline/stage -run=Example -v
# PASS
# ok  	github.com/AltairaLabs/PromptKit/runtime/pipeline/stage	0.185s

Creating Custom Stages

Use the stage architecture directly:

pipeline := stage.NewPipelineBuilder().
    Chain(/* your stages */).
    Build()
Stage Implementation Pattern
type MyStage struct {
    stage.BaseStage
}

func NewMyStage() *MyStage {
    return &MyStage{
        BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
    }
}

func (s *MyStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error {
    defer close(output)

    for elem := range input {
        // Transform elements
        if elem.Message != nil {
            // Modify message
        }

        select {
        case output <- elem:
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}
Common Patterns
Pattern 1: Message Transformation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
    defer close(output)
    for elem := range input {
        if elem.Message != nil {
            elem.Message = transform(elem.Message)
        }
        output <- elem
    }
    return nil
}
Pattern 2: State Access via Metadata
elem.Metadata["system_prompt"] = "..."
elem.Metadata["allowed_tools"] = []string{...}
Pattern 3: Error Handling
if err != nil {
    output <- stage.NewErrorElement(err)
    return err
}
Need Help?

Architecture Benefits

  1. True Streaming: Elements flow through as soon as they're ready, no accumulation
  2. Lower Latency: Parallel stage execution, no sequential blocking
  3. Backpressure: Slow consumers naturally slow producers via channel blocking
  4. Clear Concurrency: Each stage = one goroutine, explicit lifecycle
  5. Flexible Topology: DAG supports branching, fan-in, fan-out
  6. Better Testing: Stages are independently testable units
  7. Observability: Per-stage events, metrics, tracing

References

Documentation

Overview

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides the reactive streams architecture for pipeline execution.

Package stage provides pipeline stages for audio processing.

Index

Examples

Constants

View Source
const (
	// DefaultChannelBufferSize is the default buffer size for channels between stages.
	DefaultChannelBufferSize = 16
	// DefaultMaxConcurrentPipelines is the default maximum number of concurrent pipeline executions.
	DefaultMaxConcurrentPipelines = 100
	// DefaultExecutionTimeoutSeconds is the default execution timeout in seconds.
	DefaultExecutionTimeoutSeconds = 30
	// DefaultGracefulShutdownTimeoutSeconds is the default graceful shutdown timeout in seconds.
	DefaultGracefulShutdownTimeoutSeconds = 10
)

Variables

View Source
var (
	// ErrPipelineShuttingDown is returned when attempting to execute a pipeline that is shutting down.
	ErrPipelineShuttingDown = errors.New("pipeline is shutting down")

	// ErrShutdownTimeout is returned when pipeline shutdown times out.
	ErrShutdownTimeout = errors.New("shutdown timeout exceeded")

	// ErrInvalidPipeline is returned when building an invalid pipeline.
	ErrInvalidPipeline = errors.New("invalid pipeline configuration")

	// ErrCyclicDependency is returned when the pipeline DAG contains cycles.
	ErrCyclicDependency = errors.New("cyclic dependency detected in pipeline")

	// ErrStageNotFound is returned when a referenced stage doesn't exist.
	ErrStageNotFound = errors.New("stage not found")

	// ErrDuplicateStageName is returned when multiple stages have the same name.
	ErrDuplicateStageName = errors.New("duplicate stage name")

	// ErrNoStages is returned when trying to build a pipeline with no stages.
	ErrNoStages = errors.New("pipeline must have at least one stage")

	// ErrInvalidChannelBufferSize is returned for invalid buffer size.
	ErrInvalidChannelBufferSize = errors.New("channel buffer size must be non-negative")

	// ErrInvalidMaxConcurrentPipelines is returned for invalid max concurrent pipelines.
	ErrInvalidMaxConcurrentPipelines = errors.New("max concurrent pipelines must be non-negative")

	// ErrInvalidExecutionTimeout is returned for invalid execution timeout.
	ErrInvalidExecutionTimeout = errors.New("execution timeout must be non-negative")

	// ErrInvalidGracefulShutdownTimeout is returned for invalid graceful shutdown timeout.
	ErrInvalidGracefulShutdownTimeout = errors.New("graceful shutdown timeout must be non-negative")
)

Common errors

Functions

func BatchEmbeddingTexts

func BatchEmbeddingTexts(texts []string, batchSize int) [][]string

BatchEmbeddingTexts splits texts into batches of the given size. Useful for respecting embedding provider batch limits.

func CosineSimilarity

func CosineSimilarity(a, b []float32) float64

CosineSimilarity computes the cosine similarity between two embedding vectors. Returns a value between -1.0 and 1.0, where:

  • 1.0 means vectors are identical in direction
  • 0.0 means vectors are orthogonal (unrelated)
  • -1.0 means vectors are opposite

For text embeddings, values typically range from 0.0 to 1.0, with higher values indicating greater semantic similarity.

Returns 0.0 if vectors have different lengths, are empty, or have zero magnitude.

func DescribeCapabilities

func DescribeCapabilities(stage Stage) string

DescribeCapabilities returns a human-readable description of a stage's capabilities. Useful for debugging and logging.

func GetTraceInfo

func GetTraceInfo(elem *StreamElement) (traceID string, stageTimes map[string]time.Time)

GetTraceInfo extracts trace information from an element.

func NormalizeEmbedding

func NormalizeEmbedding(embedding []float32) []float32

NormalizeEmbedding normalizes an embedding vector to unit length. This can improve similarity comparisons by ensuring all vectors have the same magnitude.

func PutElement

func PutElement(elem *StreamElement)

PutElement returns a StreamElement to the pool for reuse. The element is reset before being returned to the pool to prevent data leaks. After calling PutElement, the caller must not use the element again.

func ValidateCapabilities

func ValidateCapabilities(stages []Stage, edges map[string][]string)

ValidateCapabilities checks format compatibility between connected stages. It logs warnings for potential mismatches but does not return errors, as format compatibility can often only be fully determined at runtime.

This function is called during pipeline building to provide early feedback about potential issues.

Types

type AudioCapability

type AudioCapability struct {
	// Formats lists accepted audio formats. Empty slice means any format.
	Formats []AudioFormat
	// SampleRates lists accepted sample rates in Hz. Empty slice means any rate.
	SampleRates []int
	// Channels lists accepted channel counts. Empty slice means any.
	Channels []int
}

AudioCapability describes audio format requirements for a stage.

func (*AudioCapability) AcceptsAudio

func (ac *AudioCapability) AcceptsAudio(audio *AudioData) bool

AcceptsAudio returns true if this capability accepts the given audio data.

func (*AudioCapability) AcceptsChannels

func (ac *AudioCapability) AcceptsChannels(channels int) bool

AcceptsChannels returns true if this capability accepts the given channel count. Returns true if Channels is empty (accepts any).

func (*AudioCapability) AcceptsFormat

func (ac *AudioCapability) AcceptsFormat(format AudioFormat) bool

AcceptsFormat returns true if this capability accepts the given format. Returns true if Formats is empty (accepts any).

func (*AudioCapability) AcceptsSampleRate

func (ac *AudioCapability) AcceptsSampleRate(rate int) bool

AcceptsSampleRate returns true if this capability accepts the given sample rate. Returns true if SampleRates is empty (accepts any).

type AudioData

type AudioData struct {
	Samples    []byte        // Raw audio samples
	SampleRate int           // Sample rate in Hz (e.g., 16000, 44100)
	Channels   int           // Number of audio channels (1=mono, 2=stereo)
	Format     AudioFormat   // Audio encoding format
	Duration   time.Duration // Duration of the audio segment
	Encoding   string        // Encoding scheme (e.g., "pcm", "opus")
}

AudioData carries audio samples with metadata.

type AudioFormat

type AudioFormat int

AudioFormat represents the encoding format of audio data.

const (
	// AudioFormatPCM16 is 16-bit PCM encoding
	AudioFormatPCM16 AudioFormat = iota
	// AudioFormatFloat32 is 32-bit floating point encoding
	AudioFormatFloat32
	// AudioFormatOpus is Opus codec encoding
	AudioFormatOpus
	// AudioFormatMP3 is MP3 encoding
	AudioFormatMP3
	// AudioFormatAAC is AAC encoding
	AudioFormatAAC
)

func (AudioFormat) String

func (af AudioFormat) String() string

String returns the string representation of the audio format.

type AudioResampleConfig

type AudioResampleConfig struct {
	// TargetSampleRate is the desired output sample rate in Hz.
	// Common values: 16000 (Gemini), 24000 (OpenAI TTS), 44100 (CD quality).
	TargetSampleRate int

	// PassthroughIfSameRate skips resampling if input rate matches target rate.
	// Default: true.
	PassthroughIfSameRate bool
}

AudioResampleConfig contains configuration for the audio resampling stage.

func DefaultAudioResampleConfig

func DefaultAudioResampleConfig() AudioResampleConfig

DefaultAudioResampleConfig returns sensible defaults for audio resampling.

type AudioResampleStage

type AudioResampleStage struct {
	BaseStage
	// contains filtered or unexported fields
}

AudioResampleStage resamples audio data to a target sample rate. This is useful for normalizing audio from different sources (TTS, files) to match provider requirements.

This is a Transform stage: audio element → resampled audio element (1:1)

func NewAudioResampleStage

func NewAudioResampleStage(config AudioResampleConfig) *AudioResampleStage

NewAudioResampleStage creates a new audio resampling stage.

func (*AudioResampleStage) GetConfig

func (s *AudioResampleStage) GetConfig() AudioResampleConfig

GetConfig returns the stage configuration.

func (*AudioResampleStage) Process

func (s *AudioResampleStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Resamples audio in each element to the target sample rate.

type AudioTurnConfig

type AudioTurnConfig struct {
	// VAD is the voice activity detector.
	// If nil, a SimpleVAD with default params is created.
	VAD audio.VADAnalyzer

	// TurnDetector determines when user has finished speaking.
	// If nil, VAD state transitions are used for turn detection.
	TurnDetector audio.TurnDetector

	// InterruptionHandler detects when user interrupts TTS output.
	// This should be shared with TTSStageWithInterruption.
	// If nil, interruption detection is disabled.
	InterruptionHandler *audio.InterruptionHandler

	// SilenceDuration is how long silence must persist to trigger turn complete.
	// Default: 800ms
	SilenceDuration time.Duration

	// MinSpeechDuration is minimum speech before turn can complete.
	// Default: 200ms
	MinSpeechDuration time.Duration

	// MaxTurnDuration is maximum turn length before forcing completion.
	// Default: 30s
	MaxTurnDuration time.Duration

	// SampleRate is the audio sample rate for output AudioData.
	// Default: 16000
	SampleRate int
}

AudioTurnConfig configures the AudioTurnStage.

func DefaultAudioTurnConfig

func DefaultAudioTurnConfig() AudioTurnConfig

DefaultAudioTurnConfig returns sensible defaults for AudioTurnStage.

type AudioTurnStage

type AudioTurnStage struct {
	BaseStage
	// contains filtered or unexported fields
}

AudioTurnStage detects voice activity and accumulates audio into complete turns. It outputs complete audio utterances when the user stops speaking.

This stage consolidates: - Voice Activity Detection (VAD) - Turn boundary detection - Audio accumulation - Interruption detection (shared with TTSStageWithInterruption)

This is an Accumulate stage: N audio chunks → 1 audio utterance

func NewAudioTurnStage

func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)

NewAudioTurnStage creates a new audio turn stage.

func (*AudioTurnStage) Process

func (s *AudioTurnStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Accumulates audio chunks until turn complete, then emits audio utterance.

type BaseStage

type BaseStage struct {
	// contains filtered or unexported fields
}

BaseStage provides common functionality for stage implementations. Stages can embed this to reduce boilerplate.

func NewBaseStage

func NewBaseStage(name string, stageType StageType) BaseStage

NewBaseStage creates a new BaseStage with the given name and type.

func (*BaseStage) Name

func (b *BaseStage) Name() string

Name returns the stage name.

func (*BaseStage) Type

func (b *BaseStage) Type() StageType

Type returns the stage type.

type BroadcastRouter

type BroadcastRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

BroadcastRouter sends each element to ALL registered outputs. Useful for fan-out scenarios where all consumers need every element.

func NewBroadcastRouter

func NewBroadcastRouter(name string) *BroadcastRouter

NewBroadcastRouter creates a router that broadcasts to all outputs.

func (*BroadcastRouter) Process

func (r *BroadcastRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process broadcasts each element to all outputs.

func (*BroadcastRouter) RegisterOutput

func (r *BroadcastRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name.

type ByOriginalIndex

type ByOriginalIndex []ScoredMessage

ByOriginalIndex sorts ScoredMessages by their original index (ascending).

func (ByOriginalIndex) Len

func (s ByOriginalIndex) Len() int

func (ByOriginalIndex) Less

func (s ByOriginalIndex) Less(i, j int) bool

func (ByOriginalIndex) Swap

func (s ByOriginalIndex) Swap(i, j int)

type Capabilities

type Capabilities struct {
	// ContentTypes lists the content types handled. Empty means any.
	ContentTypes []ContentType
	// Audio specifies audio-specific requirements. Nil means N/A or any.
	Audio *AudioCapability
}

Capabilities describes what a stage accepts or produces.

func AnyCapabilities

func AnyCapabilities() Capabilities

AnyCapabilities returns capabilities that accept any content type.

func AudioCapabilities

func AudioCapabilities(formats []AudioFormat, sampleRates, channels []int) Capabilities

AudioCapabilities returns capabilities for audio content with optional format constraints.

func MessageCapabilities

func MessageCapabilities() Capabilities

MessageCapabilities returns capabilities for message content.

func TextCapabilities

func TextCapabilities() Capabilities

TextCapabilities returns capabilities for text-only content.

func (*Capabilities) AcceptsContentType

func (c *Capabilities) AcceptsContentType(ct ContentType) bool

AcceptsContentType returns true if this capability accepts the given content type. Returns true if ContentTypes is empty (accepts any).

func (*Capabilities) AcceptsElement

func (c *Capabilities) AcceptsElement(elem *StreamElement) bool

AcceptsElement returns true if this capability accepts the given stream element.

type ContentRouter

type ContentRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

ContentRouter routes elements to different outputs based on predicate rules. Rules are evaluated in order; the first matching rule determines the destination. Elements that don't match any rule are dropped with a warning log.

func NewContentRouter

func NewContentRouter(name string, rules ...RoutingRule) *ContentRouter

NewContentRouter creates a new content-aware router with the given rules.

func (*ContentRouter) Process

func (r *ContentRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process routes elements based on the configured rules.

func (*ContentRouter) RegisterOutput

func (r *ContentRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name. This must be called before Process() to set up routing destinations.

type ContentType

type ContentType int

ContentType describes the type of content a stage handles.

const (
	// ContentTypeAny indicates the stage accepts any content type.
	ContentTypeAny ContentType = iota
	// ContentTypeText indicates text content.
	ContentTypeText
	// ContentTypeAudio indicates audio content.
	ContentTypeAudio
	// ContentTypeVideo indicates video content.
	ContentTypeVideo
	// ContentTypeImage indicates image content.
	ContentTypeImage
	// ContentTypeMessage indicates a complete message.
	ContentTypeMessage
	// ContentTypeToolCall indicates a tool invocation.
	ContentTypeToolCall
)

func (ContentType) String

func (ct ContentType) String() string

String returns the string representation of the content type.

type ContextBuilderPolicy

type ContextBuilderPolicy struct {
	TokenBudget      int
	ReserveForOutput int
	Strategy         TruncationStrategy
	CacheBreakpoints bool

	// RelevanceConfig for TruncateLeastRelevant strategy (optional).
	// If nil when using TruncateLeastRelevant, falls back to TruncateOldest.
	RelevanceConfig *RelevanceConfig
}

ContextBuilderPolicy defines token budget and truncation behavior.

type ContextBuilderStage

type ContextBuilderStage struct {
	BaseStage
	// contains filtered or unexported fields
}

ContextBuilderStage manages token budget and truncates messages if needed.

This stage ensures the conversation context fits within the LLM's token budget by applying truncation strategies when messages exceed the limit.

Token budget calculation:

available = TokenBudget - ReserveForOutput - systemPromptTokens

Truncation strategies (TruncationStrategy):

  • TruncateOldest: removes oldest messages first (keeps most recent context)
  • TruncateLeastRelevant: removes least relevant messages (requires embeddings) [TODO]
  • TruncateSummarize: compresses old messages into summaries [TODO]
  • TruncateFail: returns error if budget exceeded (strict mode)

Configuration (ContextBuilderPolicy):

  • TokenBudget: total tokens allowed (0 = unlimited, pass-through mode)
  • ReserveForOutput: tokens reserved for LLM response
  • Strategy: truncation strategy to apply
  • CacheBreakpoints: enable prompt caching hints

Metadata added:

  • context_truncated: true if truncation was applied
  • enable_cache_breakpoints: copied from policy.CacheBreakpoints

This is an Accumulate stage: N input elements → N (possibly fewer) output elements

func NewContextBuilderStage

func NewContextBuilderStage(policy *ContextBuilderPolicy) *ContextBuilderStage

NewContextBuilderStage creates a context builder stage.

func (*ContextBuilderStage) Process

func (s *ContextBuilderStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process enforces token budget and truncates messages if needed.

type DebugStage

type DebugStage struct {
	BaseStage
	// contains filtered or unexported fields
}

DebugStage logs StreamElements for debugging pipeline state. Useful for development and troubleshooting.

func NewDebugStage

func NewDebugStage(stageName string) *DebugStage

NewDebugStage creates a debug stage that logs elements at a specific pipeline location.

func (*DebugStage) Process

func (s *DebugStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process logs each element as it passes through (passthrough transform).

type DuplexProviderStage

type DuplexProviderStage struct {
	BaseStage
	// contains filtered or unexported fields
}

DuplexProviderStage handles bidirectional streaming through a session. It forwards elements from input to the provider's session and forwards responses from the session to output.

This stage is PROVIDER-AGNOSTIC. Provider-specific behaviors (interruptions, reconnection, protocol quirks) are handled BY the provider internally.

System Prompt Handling: The first element received may contain a system prompt in metadata["system_prompt"]. This is sent to the session via SendSystemContext() before processing audio/text.

Response Accumulation: Streaming providers send text/audio responses in chunks. This stage accumulates content across chunks and creates a Message on turn completion (FinishReason).

Session Closure: When the session closes unexpectedly, any accumulated content is emitted as a partial response. The executor is responsible for session recreation if needed.

This is a Bidirectional stage: input elements ⟷ session ⟷ output elements

func NewDuplexProviderStage

func NewDuplexProviderStage(
	provider providers.StreamInputSupport,
	baseConfig *providers.StreamingInputConfig,
) *DuplexProviderStage

NewDuplexProviderStage creates a new duplex provider stage. The session is created lazily when the first element arrives, using system_prompt from element metadata. This allows the pipeline to be the single source of truth for prompt assembly.

func NewDuplexProviderStageWithEmitter

func NewDuplexProviderStageWithEmitter(
	provider providers.StreamInputSupport,
	baseConfig *providers.StreamingInputConfig,
	emitter *events.Emitter,
) *DuplexProviderStage

NewDuplexProviderStageWithEmitter creates a new duplex provider stage with event emission support. The emitter is used to emit audio.input and audio.output events for session recording.

func (*DuplexProviderStage) Process

func (s *DuplexProviderStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Handles bidirectional streaming between input channel and WebSocket session.

For duplex streaming (Gemini Live API), this runs until: - Context is canceled (user stops the session) - Session response channel is closed (server ends session) - Input channel is closed (upstream ends)

If no session is pre-configured, the session is created lazily when the first element arrives. The system_prompt from element metadata is used as the SystemInstruction for session creation.

type EndInputter

type EndInputter interface {
	EndInput()
}

EndInputter is an optional interface for sessions that support explicit end-of-input signaling. This is primarily used by mock sessions to trigger responses after all audio has been sent.

type ExecutionResult

type ExecutionResult struct {
	Messages []types.Message        // All messages in the conversation
	Response *Response              // The final response
	Trace    ExecutionTrace         // Execution trace
	CostInfo types.CostInfo         // Cost information
	Metadata map[string]interface{} // Additional metadata
}

ExecutionResult represents the final result of a pipeline execution. This matches the existing pipeline.ExecutionResult for compatibility.

type ExecutionTrace

type ExecutionTrace struct {
	StartedAt   time.Time
	CompletedAt *time.Time
	Duration    time.Duration
}

ExecutionTrace captures execution history (for compatibility).

type FilterStage

type FilterStage struct {
	BaseStage
	// contains filtered or unexported fields
}

FilterStage filters elements based on a predicate function.

func NewFilterStage

func NewFilterStage(name string, predicate func(StreamElement) bool) *FilterStage

NewFilterStage creates a new filter stage.

func (*FilterStage) Process

func (fs *FilterStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error

Process filters elements based on the predicate.

type FormatCapable

type FormatCapable interface {
	// InputCapabilities returns what formats/content types this stage accepts.
	InputCapabilities() Capabilities
	// OutputCapabilities returns what formats/content types this stage produces.
	OutputCapabilities() Capabilities
}

FormatCapable is an optional interface that stages can implement to declare their input/output format requirements. Stages that don't implement this are treated as accepting/producing any format.

type HashRouter

type HashRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

HashRouter routes elements based on consistent hashing of a key. This ensures elements with the same key always go to the same destination.

func NewHashRouter

func NewHashRouter(name string, outputNames []string, keyFunc func(StreamElement) string) *HashRouter

NewHashRouter creates a router that uses consistent hashing. The keyFunc extracts a key from each element (e.g., session ID). Elements with the same key always route to the same destination.

func (*HashRouter) Process

func (r *HashRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process routes elements based on hash of key.

func (*HashRouter) RegisterOutput

func (r *HashRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name.

type ImageData

type ImageData struct {
	Data     []byte // Raw image data (encoded as JPEG, PNG, etc.)
	MIMEType string // MIME type (e.g., "image/jpeg", "image/png")
	Width    int    // Image width in pixels
	Height   int    // Image height in pixels
	Format   string // Format identifier (e.g., "jpeg", "png", "webp")
}

ImageData carries image data with metadata.

type MapStage

type MapStage struct {
	BaseStage
	// contains filtered or unexported fields
}

MapStage transforms elements using a mapping function.

Example

ExampleMapStage demonstrates using a map stage to transform elements.

package main

import (
	"context"
	"fmt"

	"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
	"github.com/AltairaLabs/PromptKit/runtime/types"
)

func main() {
	// Create a map stage that uppercases text
	uppercaseStage := stage.NewMapStage("uppercase", func(elem stage.StreamElement) (stage.StreamElement, error) {
		if elem.Message != nil {
			msg := *elem.Message
			msg.Content = "TRANSFORMED: " + msg.Content
			elem.Message = &msg
		}
		return elem, nil
	})

	// Build pipeline
	pipeline, _ := stage.NewPipelineBuilder().
		Chain(uppercaseStage).
		Build()

	// Execute
	input := make(chan stage.StreamElement, 1)
	input <- stage.NewMessageElement(&types.Message{
		Role:    "user",
		Content: "hello",
	})
	close(input)

	output, _ := pipeline.Execute(context.Background(), input)
	for elem := range output {
		if elem.Message != nil {
			fmt.Printf("%s\n", elem.Message.Content)
		}
	}
}
Output:

TRANSFORMED: hello

func NewMapStage

func NewMapStage(name string, mapFunc func(StreamElement) (StreamElement, error)) *MapStage

NewMapStage creates a new map stage.

func (*MapStage) Process

func (ms *MapStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error

Process transforms each element using the map function.

type MediaExternalizerConfig

type MediaExternalizerConfig struct {
	Enabled         bool
	StorageService  storage.MediaStorageService
	SizeThresholdKB int64
	DefaultPolicy   string
	RunID           string
	SessionID       string
	ConversationID  string
}

MediaExternalizerConfig configures media externalization behavior.

type MediaExternalizerStage

type MediaExternalizerStage struct {
	BaseStage
	// contains filtered or unexported fields
}

MediaExternalizerStage externalizes large media content to external storage.

When messages contain large inline media (images, audio, video), this stage moves the data to external storage and replaces it with a storage reference. This reduces memory usage and allows for media lifecycle management.

Behavior:

  • Skipped if Enabled=false or no StorageService configured
  • Only externalizes media exceeding SizeThresholdKB (base64 size)
  • Preserves media.StorageReference if already externalized
  • Clears media.Data after successful externalization

Configuration:

  • Enabled: master switch for externalization
  • SizeThresholdKB: minimum size to externalize (0 = externalize all)
  • StorageService: where to store media (S3, GCS, local filesystem, etc.)
  • DefaultPolicy: retention policy name for stored media

This is a Transform stage: 1 input element → 1 output element (with externalized media)

func NewMediaExternalizerStage

func NewMediaExternalizerStage(config *MediaExternalizerConfig) *MediaExternalizerStage

NewMediaExternalizerStage creates a media externalizer stage.

func (*MediaExternalizerStage) Process

func (s *MediaExternalizerStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process externalizes media from messages if they exceed size threshold.

type MergeStage

type MergeStage struct {
	BaseStage
	// contains filtered or unexported fields
}

MergeStage merges multiple input channels into a single output channel. This enables fan-in patterns where multiple stages feed into one.

This is an Accumulate stage type that handles multiple inputs (N:1 merge).

func NewMergeStage

func NewMergeStage(name string, inputCount int) *MergeStage

NewMergeStage creates a new merge stage that merges N inputs into 1 output.

func (*MergeStage) Process

func (s *MergeStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface (single input). For merge stage, this is not typically used - use ProcessMultiple instead.

func (*MergeStage) ProcessMultiple

func (s *MergeStage) ProcessMultiple(
	ctx context.Context,
	inputs []<-chan StreamElement,
	output chan<- StreamElement,
) error

ProcessMultiple processes multiple input channels and merges them into one output. This is a special method for merge stages that differs from the standard Process signature.

type MetricsStage

type MetricsStage struct {
	BaseStage
	// contains filtered or unexported fields
}

MetricsStage wraps another stage and collects metrics about its performance. This is a transparent wrapper that doesn't modify element flow.

func NewMetricsStage

func NewMetricsStage(wrappedStage Stage) *MetricsStage

NewMetricsStage wraps a stage with metrics collection.

func (*MetricsStage) GetMetrics

func (s *MetricsStage) GetMetrics() StageMetrics

GetMetrics returns the collected metrics.

func (*MetricsStage) Process

func (s *MetricsStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface with metrics collection.

type PassthroughStage

type PassthroughStage struct {
	BaseStage
}

PassthroughStage is a simple stage that passes all elements through unchanged. Useful for testing or as a placeholder.

func NewPassthroughStage

func NewPassthroughStage(name string) *PassthroughStage

NewPassthroughStage creates a new passthrough stage.

func (*PassthroughStage) Process

func (ps *PassthroughStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error

Process passes all elements through unchanged.

type PipelineBuilder

type PipelineBuilder struct {
	// contains filtered or unexported fields
}

PipelineBuilder constructs a pipeline DAG. It provides methods for creating linear chains and branching topologies.

Example

ExamplePipelineBuilder demonstrates building a simple linear pipeline.

package main

import (
	"context"
	"fmt"

	"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
	"github.com/AltairaLabs/PromptKit/runtime/types"
)

func main() {
	// Create some simple stages
	inputStage := stage.NewPassthroughStage("input")
	processStage := stage.NewPassthroughStage("process")
	outputStage := stage.NewPassthroughStage("output")

	// Build a linear pipeline
	pipeline, err := stage.NewPipelineBuilder().
		Chain(inputStage, processStage, outputStage).
		Build()

	if err != nil {
		fmt.Printf("Error building pipeline: %v\n", err)
		return
	}

	// Create input channel with a message
	input := make(chan stage.StreamElement, 1)
	input <- stage.NewMessageElement(&types.Message{
		Role:    "user",
		Content: "Hello, world!",
	})
	close(input)

	// Execute pipeline
	ctx := context.Background()
	output, err := pipeline.Execute(ctx, input)
	if err != nil {
		fmt.Printf("Error executing pipeline: %v\n", err)
		return
	}

	// Consume output
	for elem := range output {
		if elem.Message != nil {
			fmt.Printf("Received message: %s\n", elem.Message.Content)
		}
	}
}
Output:

Received message: Hello, world!
Example (WithConfig)

ExamplePipelineBuilder_withConfig demonstrates building a pipeline with custom configuration.

package main

import (
	"fmt"

	"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
)

func main() {
	// Create custom config
	config := stage.DefaultPipelineConfig().
		WithChannelBufferSize(32).
		WithPriorityQueue(true).
		WithMetrics(true)

	// Build pipeline with config
	pipeline, err := stage.NewPipelineBuilderWithConfig(config).
		Chain(
			stage.NewPassthroughStage("stage1"),
			stage.NewPassthroughStage("stage2"),
		).
		Build()

	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	fmt.Printf("Pipeline created with %d stages\n", 2)
	_ = pipeline
}
Output:

Pipeline created with 2 stages

func NewPipelineBuilder

func NewPipelineBuilder() *PipelineBuilder

NewPipelineBuilder creates a new PipelineBuilder with default configuration.

func NewPipelineBuilderWithConfig

func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilder

NewPipelineBuilderWithConfig creates a new PipelineBuilder with custom configuration.

func (*PipelineBuilder) AddStage

func (b *PipelineBuilder) AddStage(stage Stage) *PipelineBuilder

AddStage adds a stage to the builder without connecting it. This is useful when building complex topologies manually.

func (*PipelineBuilder) Branch

func (b *PipelineBuilder) Branch(fromStage string, toStages ...string) *PipelineBuilder

Branch creates multiple outgoing connections from a single stage. This allows one stage's output to fan out to multiple downstream stages.

Example:

pipeline := NewPipelineBuilder().
    Chain(NewStageA(), NewStageB()).
    Branch("stageB", "stageC", "stageD").  // B's output goes to both C and D
    Build()

func (*PipelineBuilder) Build

func (b *PipelineBuilder) Build() (*StreamPipeline, error)

Build constructs the pipeline from the builder's configuration. It validates the pipeline structure and returns an error if invalid.

func (*PipelineBuilder) Chain

func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder

Chain creates a linear chain of stages. This is the most common pattern: stage1 -> stage2 -> stage3. Each stage's output is connected to the next stage's input.

Example:

pipeline := NewPipelineBuilder().
    Chain(
        NewStageA(),
        NewStageB(),
        NewStageC(),
    ).
    Build()

func (*PipelineBuilder) Clone

func (b *PipelineBuilder) Clone() *PipelineBuilder

Clone creates a deep copy of the builder.

func (*PipelineBuilder) Connect

func (b *PipelineBuilder) Connect(fromStage, toStage string) *PipelineBuilder

Connect creates a directed edge from one stage to another. The output of fromStage will be connected to the input of toStage.

func (*PipelineBuilder) WithConfig

func (b *PipelineBuilder) WithConfig(config *PipelineConfig) *PipelineBuilder

WithConfig sets the pipeline configuration.

func (*PipelineBuilder) WithEventEmitter

func (b *PipelineBuilder) WithEventEmitter(emitter *events.Emitter) *PipelineBuilder

WithEventEmitter sets the event emitter for the pipeline.

type PipelineConfig

type PipelineConfig struct {
	// ChannelBufferSize controls buffering between stages.
	// Smaller values = lower latency but more backpressure.
	// Larger values = higher throughput but more memory usage.
	// Default: 16
	ChannelBufferSize int

	// PriorityQueueEnabled enables priority-based scheduling.
	// When enabled, high-priority elements (audio) are processed before low-priority (logs).
	// Default: false
	PriorityQueueEnabled bool

	// MaxConcurrentPipelines limits the number of concurrent pipeline executions.
	// This is used by PipelinePool to control concurrency.
	// Default: 100
	MaxConcurrentPipelines int

	// ExecutionTimeout sets the maximum duration for a single pipeline execution.
	// Set to 0 to disable timeout.
	// Default: 30 seconds
	ExecutionTimeout time.Duration

	// GracefulShutdownTimeout sets the maximum time to wait for in-flight executions during shutdown.
	// Default: 10 seconds
	GracefulShutdownTimeout time.Duration

	// EnableMetrics enables collection of per-stage metrics (latency, throughput, etc.).
	// Default: false
	EnableMetrics bool

	// EnableTracing enables detailed tracing of element flow through stages.
	// Default: false (can be expensive for high-throughput pipelines)
	EnableTracing bool
}

PipelineConfig defines configuration options for pipeline execution.

func DefaultPipelineConfig

func DefaultPipelineConfig() *PipelineConfig

DefaultPipelineConfig returns a PipelineConfig with sensible defaults.

func (*PipelineConfig) Validate

func (c *PipelineConfig) Validate() error

Validate checks if the configuration is valid.

func (*PipelineConfig) WithChannelBufferSize

func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig

WithChannelBufferSize sets the channel buffer size.

func (*PipelineConfig) WithExecutionTimeout

func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig

WithExecutionTimeout sets the execution timeout.

func (*PipelineConfig) WithGracefulShutdownTimeout

func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig

WithGracefulShutdownTimeout sets the graceful shutdown timeout.

func (*PipelineConfig) WithMaxConcurrentPipelines

func (c *PipelineConfig) WithMaxConcurrentPipelines(maxPipelines int) *PipelineConfig

WithMaxConcurrentPipelines sets the maximum number of concurrent pipeline executions.

func (*PipelineConfig) WithMetrics

func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig

WithMetrics enables or disables metrics collection.

func (*PipelineConfig) WithPriorityQueue

func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig

WithPriorityQueue enables or disables priority-based scheduling.

func (*PipelineConfig) WithTracing

func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig

WithTracing enables or disables detailed tracing.

type Priority

type Priority int

Priority defines the scheduling priority for stream elements. Higher priority elements are processed before lower priority ones.

const (
	// PriorityLow is for non-critical data like logs or metrics
	PriorityLow Priority = iota
	// PriorityNormal is the default priority for most elements
	PriorityNormal
	// PriorityHigh is for real-time audio/video that requires low latency
	PriorityHigh
	// PriorityCritical is for control signals, errors, and system messages
	PriorityCritical
)

type PriorityChannel

type PriorityChannel struct {
	// contains filtered or unexported fields
}

PriorityChannel is a channel that supports priority-based element delivery. Higher priority elements are delivered before lower priority elements.

func NewPriorityChannel

func NewPriorityChannel(capacity int) *PriorityChannel

NewPriorityChannel creates a new priority channel with the given capacity.

func (*PriorityChannel) Close

func (pc *PriorityChannel) Close()

Close closes the priority channel.

func (*PriorityChannel) Len

func (pc *PriorityChannel) Len() int

Len returns the current number of elements in the channel.

func (*PriorityChannel) Receive

func (pc *PriorityChannel) Receive(ctx context.Context) (StreamElement, bool, error)

Receive receives the highest priority element from the channel. Blocks if the channel is empty.

func (*PriorityChannel) Send

func (pc *PriorityChannel) Send(ctx context.Context, elem StreamElement) error

Send sends an element to the priority channel. Blocks if the channel is at capacity.

type PromptAssemblyStage

type PromptAssemblyStage struct {
	BaseStage
	// contains filtered or unexported fields
}

PromptAssemblyStage loads and assembles prompts from the prompt registry. It enriches elements with system prompt, allowed tools, and variables.

func NewPromptAssemblyStage

func NewPromptAssemblyStage(
	promptRegistry *prompt.Registry,
	taskType string,
	baseVariables map[string]string,
) *PromptAssemblyStage

NewPromptAssemblyStage creates a new prompt assembly stage.

func (*PromptAssemblyStage) Process

func (s *PromptAssemblyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error

Process loads and assembles the prompt, enriching elements with prompt data.

type ProviderConfig

type ProviderConfig struct {
	MaxTokens    int
	Temperature  float32
	Seed         *int
	DisableTrace bool
}

ProviderConfig contains configuration for the provider stage.

type ProviderStage

type ProviderStage struct {
	BaseStage
	// contains filtered or unexported fields
}

ProviderStage executes LLM calls and handles tool execution. This is the request/response mode implementation.

func NewProviderStage

func NewProviderStage(
	provider providers.Provider,
	toolRegistry *tools.Registry,
	toolPolicy *pipeline.ToolPolicy,
	config *ProviderConfig,
) *ProviderStage

NewProviderStage creates a new provider stage for request/response mode.

func (*ProviderStage) Process

func (s *ProviderStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process executes the LLM provider call and handles tool execution.

type QuerySourceType

type QuerySourceType string

QuerySourceType defines how to construct the relevance query.

const (
	// QuerySourceLastUser uses the last user message as the query
	QuerySourceLastUser QuerySourceType = "last_user"
	// QuerySourceLastN concatenates the last N messages as the query
	QuerySourceLastN QuerySourceType = "last_n"
	// QuerySourceCustom uses a custom query string
	QuerySourceCustom QuerySourceType = "custom"
)

type RandomRouter

type RandomRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

RandomRouter distributes elements randomly across outputs.

func NewRandomRouter

func NewRandomRouter(name string, outputNames []string) *RandomRouter

NewRandomRouter creates a router that distributes elements randomly.

func (*RandomRouter) Process

func (r *RandomRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process distributes elements randomly.

func (*RandomRouter) RegisterOutput

func (r *RandomRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name.

type RecordingPosition

type RecordingPosition string

RecordingPosition indicates where in the pipeline the recording stage is placed.

const (
	// RecordingPositionInput records elements entering the pipeline (user input).
	RecordingPositionInput RecordingPosition = "input"
	// RecordingPositionOutput records elements leaving the pipeline (agent output).
	RecordingPositionOutput RecordingPosition = "output"
)

type RecordingStage

type RecordingStage struct {
	BaseStage
	// contains filtered or unexported fields
}

RecordingStage captures pipeline elements as events for session recording. It observes elements flowing through without modifying them.

func NewRecordingStage

func NewRecordingStage(eventBus *events.EventBus, config RecordingStageConfig) *RecordingStage

NewRecordingStage creates a new recording stage.

func (*RecordingStage) Process

func (rs *RecordingStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process observes elements and records them as events.

func (*RecordingStage) WithConversationID

func (rs *RecordingStage) WithConversationID(conversationID string) *RecordingStage

WithConversationID sets the conversation ID for recorded events.

func (*RecordingStage) WithSessionID

func (rs *RecordingStage) WithSessionID(sessionID string) *RecordingStage

WithSessionID sets the session ID for recorded events.

type RecordingStageConfig

type RecordingStageConfig struct {
	// Position indicates where this stage is in the pipeline.
	Position RecordingPosition

	// SessionID is the session identifier for recorded events.
	SessionID string

	// ConversationID groups events within a session.
	ConversationID string

	// IncludeAudio records audio data (may be large).
	IncludeAudio bool

	// IncludeVideo records video data (may be large).
	IncludeVideo bool

	// IncludeImages records image data.
	IncludeImages bool
}

RecordingStageConfig configures the recording stage behavior.

func DefaultRecordingStageConfig

func DefaultRecordingStageConfig() RecordingStageConfig

DefaultRecordingStageConfig returns sensible defaults.

type RelevanceConfig

type RelevanceConfig struct {
	// EmbeddingProvider generates embeddings for similarity scoring.
	// Required for relevance-based truncation; if nil, falls back to oldest.
	EmbeddingProvider providers.EmbeddingProvider

	// MinRecentMessages always keeps the N most recent messages
	// regardless of relevance score. Default: 3
	MinRecentMessages int

	// AlwaysKeepSystemRole keeps all system role messages regardless of score.
	AlwaysKeepSystemRole bool

	// SimilarityThreshold is the minimum score to consider a message relevant (0.0-1.0).
	// Messages below this threshold may be dropped first.
	SimilarityThreshold float64

	// QuerySource determines what text to compare messages against.
	// Default: QuerySourceLastUser
	QuerySource QuerySourceType

	// LastNCount is the number of messages to use when QuerySource is QuerySourceLastN.
	// Default: 3
	LastNCount int

	// CustomQuery is the query text when QuerySource is QuerySourceCustom.
	CustomQuery string

	// CacheEmbeddings enables caching of embeddings across truncation calls.
	// Useful when context changes incrementally.
	CacheEmbeddings bool
}

RelevanceConfig configures embedding-based relevance truncation. Used when TruncationStrategy is TruncateLeastRelevant.

type Response

type Response struct {
	Role          string
	Content       string
	Parts         []types.ContentPart
	ToolCalls     []types.MessageToolCall
	FinalResponse string
}

Response represents a response message (for compatibility with existing pipeline).

type ResponseVADConfig

type ResponseVADConfig struct {
	// VAD is the voice activity detector.
	// If nil, a SimpleVAD with default params is created.
	VAD audio.VADAnalyzer

	// SilenceDuration is how long silence must persist after EndOfStream
	// to confirm turn completion.
	// Default: 500ms
	SilenceDuration time.Duration

	// MaxWaitDuration is the maximum time to wait for silence after EndOfStream.
	// If silence is not detected within this time, EndOfStream is emitted anyway.
	// Default: 3s
	MaxWaitDuration time.Duration

	// SampleRate is the expected audio sample rate.
	// Default: 24000 (Gemini output)
	SampleRate int
}

ResponseVADConfig configures the ResponseVADStage.

func DefaultResponseVADConfig

func DefaultResponseVADConfig() ResponseVADConfig

DefaultResponseVADConfig returns sensible defaults for ResponseVADStage.

type ResponseVADStage

type ResponseVADStage struct {
	BaseStage
	// contains filtered or unexported fields
}

ResponseVADStage monitors response audio for silence and delays EndOfStream until actual silence is detected. This decouples turn completion from provider signaling (e.g., Gemini's turnComplete) which may arrive before all audio chunks have been received.

This stage: 1. Passes through all elements immediately (audio, text, messages) 2. When EndOfStream is received from upstream, starts monitoring for silence 3. Only emits EndOfStream downstream when VAD confirms sustained silence 4. Has a max wait timeout to prevent indefinite blocking

This is a Transform stage with buffering: it may hold EndOfStream temporarily.

func NewResponseVADStage

func NewResponseVADStage(config ResponseVADConfig) (*ResponseVADStage, error)

NewResponseVADStage creates a new response VAD stage.

func (*ResponseVADStage) Process

func (s *ResponseVADStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Monitors response audio for silence and delays EndOfStream until confirmed.

type RoundRobinRouter

type RoundRobinRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

RoundRobinRouter distributes elements across outputs in sequence.

func NewRoundRobinRouter

func NewRoundRobinRouter(name string, outputNames []string) *RoundRobinRouter

NewRoundRobinRouter creates a router that cycles through outputs sequentially.

func (*RoundRobinRouter) Process

func (r *RoundRobinRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process distributes elements in round-robin fashion.

func (*RoundRobinRouter) RegisterOutput

func (r *RoundRobinRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name.

type RouterFunc

type RouterFunc func(elem *StreamElement) []string

RouterFunc determines which output channel(s) to route an element to. Returns a slice of output names. Empty slice means drop the element.

type RouterStage

type RouterStage struct {
	BaseStage
	// contains filtered or unexported fields
}

RouterStage routes elements to different output channels based on a routing function. This enables conditional branching and dynamic routing in the pipeline.

This is a special stage type that supports multiple outputs (1:N routing).

func NewRouterStage

func NewRouterStage(name string, routerFunc RouterFunc) *RouterStage

NewRouterStage creates a new router stage with the given routing function.

func (*RouterStage) Process

func (s *RouterStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Routes each element to appropriate output channel(s) based on routing function.

func (*RouterStage) RegisterOutput

func (s *RouterStage) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name. This must be called before Process() to set up routing destinations.

type RoutingRule

type RoutingRule struct {
	// Name identifies this rule for logging/debugging.
	Name string
	// Predicate returns true if the element should be routed to this rule's output.
	Predicate func(StreamElement) bool
	// Output is the destination name for matching elements.
	Output string
}

RoutingRule defines a predicate-based routing rule.

func RouteAudio

func RouteAudio(output string, format AudioFormat) RoutingRule

RouteAudio creates a routing rule for audio elements with specific format.

func RouteContentType

func RouteContentType(output string, ct ContentType) RoutingRule

RouteContentType creates a routing rule for elements of a specific content type.

func RouteWhen

func RouteWhen(output string, predicate func(StreamElement) bool) RoutingRule

RouteWhen creates a routing rule with the given predicate.

type STTStage

type STTStage struct {
	BaseStage
	// contains filtered or unexported fields
}

STTStage transcribes audio to text using a speech-to-text service.

This is a Transform stage: audio element → text element (1:1)

func NewSTTStage

func NewSTTStage(service stt.Service, config STTStageConfig) *STTStage

NewSTTStage creates a new STT stage.

func (*STTStage) Process

func (s *STTStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Transcribes audio elements to text.

type STTStageConfig

type STTStageConfig struct {
	// Language hint for transcription (e.g., "en")
	Language string

	// SkipEmpty skips transcription for empty audio
	SkipEmpty bool

	// MinAudioBytes is minimum audio size to transcribe
	MinAudioBytes int
}

STTStageConfig configures the STTStage.

func DefaultSTTStageConfig

func DefaultSTTStageConfig() STTStageConfig

DefaultSTTStageConfig returns sensible defaults.

type ScoredMessage

type ScoredMessage struct {
	// Index is the original position in the message slice
	Index int

	// Message is the actual message content
	Message types.Message

	// Score is the cosine similarity to the query (0.0 to 1.0)
	Score float64

	// IsProtected indicates if this message should always be kept
	// (e.g., recent messages or system messages)
	IsProtected bool

	// TokenCount is the estimated token count for this message
	TokenCount int
}

ScoredMessage pairs a message with its relevance score and metadata. Used during relevance-based truncation to track which messages to keep.

type ScoredMessages

type ScoredMessages []ScoredMessage

ScoredMessages is a sortable slice of ScoredMessage.

func (ScoredMessages) Len

func (s ScoredMessages) Len() int

func (ScoredMessages) Less

func (s ScoredMessages) Less(i, j int) bool

func (ScoredMessages) Swap

func (s ScoredMessages) Swap(i, j int)

type Stage

type Stage interface {
	// Name returns a unique identifier for this stage.
	// This is used for logging, tracing, and debugging.
	Name() string

	// Type returns the stage's processing model.
	// This helps the pipeline builder understand how the stage behaves.
	Type() StageType

	// Process is called once when the pipeline starts.
	// The stage reads from input, processes elements, and writes to output.
	// The stage MUST close output when done (or when input closes).
	// Returns an error if processing fails.
	Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stage is a processing unit in the pipeline DAG. Unlike traditional middleware, stages explicitly declare their I/O characteristics and operate on channels of StreamElements, enabling true streaming execution.

Stages read from an input channel, process elements, and write to an output channel. The stage MUST close the output channel when done (or when input closes).

Example implementation:

type ExampleStage struct {
    name string
}

func (s *ExampleStage) Name() string {
    return s.name
}

func (s *ExampleStage) Type() StageType {
    return StageTypeTransform
}

func (s *ExampleStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
    defer close(output)

    for elem := range input {
        // Process element
        processedElem := s.transform(elem)

        // Write to output
        select {
        case output <- processedElem:
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}

type StageError

type StageError struct {
	StageName string
	StageType StageType
	Err       error
}

StageError wraps an error with stage information.

func NewStageError

func NewStageError(stageName string, stageType StageType, err error) *StageError

NewStageError creates a new StageError.

func (*StageError) Error

func (e *StageError) Error() string

Error returns the error message.

func (*StageError) Unwrap

func (e *StageError) Unwrap() error

Unwrap returns the underlying error.

type StageFunc

type StageFunc struct {
	BaseStage
	// contains filtered or unexported fields
}

StageFunc is a functional adapter that allows using a function as a Stage. This is useful for simple transformations without defining a new type.

func NewStageFunc

func NewStageFunc(name string, stageType StageType, fn func(context.Context, <-chan StreamElement, chan<- StreamElement) error) *StageFunc

NewStageFunc creates a new functional stage.

func (*StageFunc) Process

func (sf *StageFunc) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error

Process executes the stage function.

type StageMetrics

type StageMetrics struct {
	StageName       string
	ElementsIn      int64
	ElementsOut     int64
	ElementsErrored int64
	TotalLatency    time.Duration
	MinLatency      time.Duration
	MaxLatency      time.Duration
	AvgLatency      time.Duration
	LastUpdated     time.Time
	// contains filtered or unexported fields
}

StageMetrics contains performance metrics for a stage.

func NewStageMetrics

func NewStageMetrics(stageName string) *StageMetrics

NewStageMetrics creates a new metrics collector for a stage.

func (*StageMetrics) GetMetrics

func (m *StageMetrics) GetMetrics() StageMetrics

GetMetrics returns a copy of the current metrics (thread-safe).

func (*StageMetrics) RecordElement

func (m *StageMetrics) RecordElement(latency time.Duration, hasError bool)

RecordElement records metrics for a processed element.

func (*StageMetrics) Reset

func (m *StageMetrics) Reset()

Reset resets all metrics to zero.

type StageType

type StageType int

StageType defines the processing model of a stage.

const (
	// StageTypeTransform performs 1:1 or 1:N element transformation.
	// Each input element produces one or more output elements.
	// Examples: validation, prompt assembly, text formatting.
	StageTypeTransform StageType = iota

	// StageTypeAccumulate performs N:1 accumulation.
	// Multiple input elements are collected and combined into one output element.
	// Examples: VAD buffering, message accumulation.
	StageTypeAccumulate

	// StageTypeGenerate performs 0:N generation.
	// Generates output elements without consuming input (or consumes once then generates many).
	// Examples: LLM streaming response, TTS generation.
	StageTypeGenerate

	// StageTypeSink is a terminal stage (N:0).
	// Consumes input elements but produces no output.
	// Examples: state store save, metrics collection, logging.
	StageTypeSink

	// StageTypeBidirectional supports full duplex communication.
	// Both reads from input and writes to output concurrently.
	// Examples: WebSocket session, duplex provider.
	StageTypeBidirectional
)

func (StageType) String

func (st StageType) String() string

String returns the string representation of the stage type.

type StateStoreLoadStage

type StateStoreLoadStage struct {
	BaseStage
	// contains filtered or unexported fields
}

StateStoreLoadStage loads conversation history from state store.

func NewStateStoreLoadStage

func NewStateStoreLoadStage(config *pipeline.StateStoreConfig) *StateStoreLoadStage

NewStateStoreLoadStage creates a new state store load stage.

func (*StateStoreLoadStage) Process

func (s *StateStoreLoadStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process loads conversation history and emits it before current input.

type StateStoreSaveStage

type StateStoreSaveStage struct {
	BaseStage
	// contains filtered or unexported fields
}

StateStoreSaveStage saves conversation state to state store.

func NewStateStoreSaveStage

func NewStateStoreSaveStage(config *pipeline.StateStoreConfig) *StateStoreSaveStage

NewStateStoreSaveStage creates a new state store save stage.

func (*StateStoreSaveStage) Process

func (s *StateStoreSaveStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process collects all messages and saves them to state store.

type StreamElement

type StreamElement struct {
	// Content types (at most one should be set per element)
	Text      *string                // Text content
	Audio     *AudioData             // Audio samples
	Video     *VideoData             // Video frame
	Image     *ImageData             // Image data
	Message   *types.Message         // Complete message
	ToolCall  *types.MessageToolCall // Tool invocation
	Part      *types.ContentPart     // Generic content part (text, image, audio, video)
	MediaData *types.MediaContent    // Media content with MIME type

	// Metadata
	Sequence  int64                  // Monotonic sequence number
	Timestamp time.Time              // When element was created
	Source    string                 // Stage that produced this element
	Priority  Priority               // Scheduling priority (for QoS)
	Metadata  map[string]interface{} // Additional metadata for passing data between stages

	// Control signals
	EndOfStream bool  // No more elements after this
	Error       error // Error propagation
}

StreamElement is the unit of data flowing through the pipeline. It can carry different types of content and supports backpressure. Each element should contain at most one content type.

Example

ExampleStreamElement demonstrates creating different types of stream elements.

package main

import (
	"fmt"

	"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
	"github.com/AltairaLabs/PromptKit/runtime/types"
)

func main() {
	// Text element
	textElem := stage.NewTextElement("Hello")
	fmt.Printf("Text element: %v\n", *textElem.Text)

	// Message element
	msgElem := stage.NewMessageElement(&types.Message{
		Role:    "user",
		Content: "Hello",
	})
	fmt.Printf("Message element: %s\n", msgElem.Message.Content)

	// Error element
	errElem := stage.NewErrorElement(fmt.Errorf("test error"))
	fmt.Printf("Error element: %v\n", errElem.Error)

}
Output:

Text element: Hello
Message element: Hello
Error element: test error

func GetAudioElement

func GetAudioElement(audio *AudioData) *StreamElement

GetAudioElement retrieves a StreamElement from the pool and initializes it with audio data. This is a pooled alternative to NewAudioElement.

func GetElement

func GetElement() *StreamElement

GetElement retrieves a StreamElement from the pool or creates a new one. The returned element is reset to its zero state with an initialized Metadata map. Callers should use PutElement when the element is no longer needed.

func GetEndOfStreamElement

func GetEndOfStreamElement() *StreamElement

GetEndOfStreamElement retrieves a StreamElement from the pool and marks it as end-of-stream. This is a pooled alternative to NewEndOfStreamElement.

func GetErrorElement

func GetErrorElement(err error) *StreamElement

GetErrorElement retrieves a StreamElement from the pool and initializes it with an error. This is a pooled alternative to NewErrorElement.

func GetImageElement

func GetImageElement(image *ImageData) *StreamElement

GetImageElement retrieves a StreamElement from the pool and initializes it with image data. This is a pooled alternative to NewImageElement.

func GetMessageElement

func GetMessageElement(msg *types.Message) *StreamElement

GetMessageElement retrieves a StreamElement from the pool and initializes it with a message. This is a pooled alternative to NewMessageElement.

func GetTextElement

func GetTextElement(text string) *StreamElement

GetTextElement retrieves a StreamElement from the pool and initializes it with text content. This is a pooled alternative to NewTextElement.

func GetVideoElement

func GetVideoElement(video *VideoData) *StreamElement

GetVideoElement retrieves a StreamElement from the pool and initializes it with video data. This is a pooled alternative to NewVideoElement.

func NewAudioElement

func NewAudioElement(audio *AudioData) StreamElement

NewAudioElement creates a new StreamElement with audio data.

func NewEndOfStreamElement

func NewEndOfStreamElement() StreamElement

NewEndOfStreamElement creates a new StreamElement marking end of stream.

func NewErrorElement

func NewErrorElement(err error) StreamElement

NewErrorElement creates a new StreamElement with an error.

func NewImageElement

func NewImageElement(image *ImageData) StreamElement

NewImageElement creates a new StreamElement with image data.

func NewMessageElement

func NewMessageElement(msg *types.Message) StreamElement

NewMessageElement creates a new StreamElement with a message.

func NewTextElement

func NewTextElement(text string) StreamElement

NewTextElement creates a new StreamElement with text content.

func NewVideoElement

func NewVideoElement(video *VideoData) StreamElement

NewVideoElement creates a new StreamElement with video data.

func (*StreamElement) GetMetadata

func (e *StreamElement) GetMetadata(key string) interface{}

GetMetadata retrieves metadata by key, returning nil if not found.

func (*StreamElement) HasContent

func (e *StreamElement) HasContent() bool

HasContent returns true if the element contains any content (excluding control signals).

func (*StreamElement) IsControl

func (e *StreamElement) IsControl() bool

IsControl returns true if the element is a control signal (error or end-of-stream).

func (*StreamElement) IsEmpty

func (e *StreamElement) IsEmpty() bool

IsEmpty returns true if the element contains no content.

func (*StreamElement) Reset

func (e *StreamElement) Reset()

Reset clears all fields of the StreamElement to their zero values. This is called automatically by PutElement before returning to the pool. The Metadata map is cleared but retained to avoid reallocation.

func (*StreamElement) WithMetadata

func (e *StreamElement) WithMetadata(key string, value interface{}) *StreamElement

WithMetadata adds metadata to this element.

func (*StreamElement) WithPriority

func (e *StreamElement) WithPriority(priority Priority) *StreamElement

WithPriority sets the priority for this element.

func (*StreamElement) WithSequence

func (e *StreamElement) WithSequence(seq int64) *StreamElement

WithSequence sets the sequence number for this element.

func (*StreamElement) WithSource

func (e *StreamElement) WithSource(source string) *StreamElement

WithSource sets the source stage name for this element.

type StreamPipeline

type StreamPipeline struct {
	// contains filtered or unexported fields
}

StreamPipeline represents an executable pipeline of stages. It manages the DAG of stages, creates channels between them, and orchestrates execution.

func (*StreamPipeline) Execute

func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)

Execute starts the pipeline execution with the given input channel. Returns an output channel that will receive all elements from terminal stages. The pipeline executes in background goroutines and closes the output channel when complete.

func (*StreamPipeline) ExecuteSync

func (p *StreamPipeline) ExecuteSync(ctx context.Context, input ...StreamElement) (*ExecutionResult, error)

ExecuteSync runs the pipeline synchronously and returns the accumulated result. This is a convenience method for request/response mode where you want a single result. It converts the streaming execution into a blocking call.

func (*StreamPipeline) Shutdown

func (p *StreamPipeline) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the pipeline, waiting for in-flight executions to complete.

type TTSConfig

type TTSConfig struct {
	// SkipEmpty skips synthesis for empty or whitespace-only text
	SkipEmpty bool

	// MinTextLength is the minimum text length to synthesize (0 = no minimum)
	MinTextLength int
}

TTSConfig contains configuration for TTS stage.

func DefaultTTSConfig

func DefaultTTSConfig() TTSConfig

DefaultTTSConfig returns sensible defaults for TTS configuration.

type TTSService

type TTSService interface {
	// Synthesize converts text to audio bytes.
	Synthesize(ctx context.Context, text string) ([]byte, error)

	// MIMEType returns the MIME type of the synthesized audio.
	MIMEType() string
}

TTSService converts text to audio.

type TTSStage

type TTSStage struct {
	BaseStage
	// contains filtered or unexported fields
}

TTSStage synthesizes audio for streaming text elements. It reads text elements from input and adds audio data to them.

This is a Transform stage: text element → text+audio element (1:1)

func NewTTSStage

func NewTTSStage(tts TTSService, config TTSConfig) *TTSStage

NewTTSStage creates a new TTS stage.

func (*TTSStage) Process

func (s *TTSStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Synthesizes audio for each text element and adds it to the element.

type TTSStageWithInterruption

type TTSStageWithInterruption struct {
	BaseStage
	// contains filtered or unexported fields
}

TTSStageWithInterruption synthesizes text to audio with interruption support. When the user starts speaking (detected via shared InterruptionHandler), synthesis is stopped and pending output is discarded.

This is a Transform stage: text element → audio element (1:1)

func NewTTSStageWithInterruption

func NewTTSStageWithInterruption(
	service tts.Service,
	config TTSStageWithInterruptionConfig,
) *TTSStageWithInterruption

NewTTSStageWithInterruption creates a new TTS stage with interruption support.

func (*TTSStageWithInterruption) Process

func (s *TTSStageWithInterruption) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Synthesizes audio for text elements with interruption support.

type TTSStageWithInterruptionConfig

type TTSStageWithInterruptionConfig struct {
	// Voice is the voice ID to use
	Voice string

	// Speed is the speech rate (0.5-2.0)
	Speed float64

	// InterruptionHandler for detecting user interrupts during TTS output.
	// Should be shared with AudioTurnStage.
	InterruptionHandler *audio.InterruptionHandler

	// SkipEmpty skips synthesis for empty text
	SkipEmpty bool

	// MinTextLength is minimum text length to synthesize
	MinTextLength int
}

TTSStageWithInterruptionConfig configures TTSStageWithInterruption.

func DefaultTTSStageWithInterruptionConfig

func DefaultTTSStageWithInterruptionConfig() TTSStageWithInterruptionConfig

DefaultTTSStageWithInterruptionConfig returns sensible defaults.

type TemplateStage

type TemplateStage struct {
	BaseStage
}

TemplateStage substitutes {{variable}} placeholders in messages and metadata.

This stage reads variables from the element's metadata["variables"] map and replaces all occurrences of {{variable_name}} in:

  • metadata["system_prompt"] - the system prompt for the LLM
  • message.Content - the message text content
  • message.Parts[].Text - individual content parts

Variables are typically set by:

  • PromptAssemblyStage (from base_variables in config)
  • VariableProviderStage (from dynamic variable providers)

Example:

Input: "Hello {{name}}, the topic is {{topic}}"
Variables: {"name": "Alice", "topic": "AI"}
Output: "Hello Alice, the topic is AI"

This is a Transform stage: 1 input element → 1 output element

func NewTemplateStage

func NewTemplateStage() *TemplateStage

NewTemplateStage creates a template substitution stage.

func (*TemplateStage) Process

func (s *TemplateStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process substitutes variables in messages and system prompt metadata.

type TracingStage

type TracingStage struct {
	BaseStage
	// contains filtered or unexported fields
}

TracingStage wraps another stage and adds element-level tracing. Each element gets a trace ID and timing information.

func NewTracingStage

func NewTracingStage(wrappedStage Stage, traceIDGen func() string) *TracingStage

NewTracingStage wraps a stage with tracing support.

func (*TracingStage) Process

func (s *TracingStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface with tracing.

type Transcriber

type Transcriber interface {
	Transcribe(ctx context.Context, audio []byte) (string, error)
}

Transcriber converts audio bytes to text. Follows Go naming convention for single-method interfaces.

type TruncationStrategy

type TruncationStrategy string

TruncationStrategy defines how to handle messages when over token budget.

const (
	// TruncateOldest drops oldest messages first
	TruncateOldest TruncationStrategy = "oldest"
	// TruncateLeastRelevant drops least relevant messages (requires embeddings)
	TruncateLeastRelevant TruncationStrategy = "relevance"
	// TruncateSummarize compresses old messages into summaries
	TruncateSummarize TruncationStrategy = "summarize"
	// TruncateFail returns error if over budget
	TruncateFail TruncationStrategy = "fail"
)

type VADAccumulatorStage

type VADAccumulatorStage struct {
	BaseStage
	// contains filtered or unexported fields
}

VADAccumulatorStage reads streaming audio chunks, detects turn boundaries via VAD, and emits a single Message element with the transcribed text.

This is an Accumulate stage: N audio chunks → 1 message element

func NewVADAccumulatorStage

func NewVADAccumulatorStage(
	analyzer audio.VADAnalyzer,
	transcriber Transcriber,
	config VADConfig,
) *VADAccumulatorStage

NewVADAccumulatorStage creates a new VAD accumulator stage.

func (*VADAccumulatorStage) Process

func (s *VADAccumulatorStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process implements the Stage interface. Accumulates audio chunks until turn complete, then transcribes and emits a message.

type VADConfig

type VADConfig struct {
	// Threshold for silence detection (0.0 = silence, 1.0 = speech)
	Threshold float64

	// MinSpeechDuration is the minimum duration of speech before turn can complete
	MinSpeechDuration time.Duration

	// MaxTurnDuration is the maximum duration before forcing turn completion
	MaxTurnDuration time.Duration

	// SilenceDuration is how long silence must persist to trigger turn complete
	SilenceDuration time.Duration
}

VADConfig contains configuration for VAD accumulator stage.

func DefaultVADConfig

func DefaultVADConfig() VADConfig

DefaultVADConfig returns sensible defaults for VAD configuration.

type ValidationStage

type ValidationStage struct {
	BaseStage
	// contains filtered or unexported fields
}

ValidationStage validates responses using configured validators.

func NewValidationStage

func NewValidationStage(registry *validators.Registry, suppressExceptions bool) *ValidationStage

NewValidationStage creates a new validation stage.

func (*ValidationStage) Process

func (s *ValidationStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process validates response elements and attaches results to metadata.

type VariableProviderStage

type VariableProviderStage struct {
	BaseStage
	// contains filtered or unexported fields
}

VariableProviderStage resolves variables from dynamic providers and adds them to metadata.

This stage calls each registered variable provider to fetch dynamic variables (e.g., from environment, external services, databases) and merges them into the element's metadata["variables"] map for use by TemplateStage.

Provider resolution order:

  1. Variables from earlier stages (e.g., PromptAssemblyStage base_variables)
  2. Each provider is called in sequence; later providers can override earlier values

Error handling:

  • If any provider fails, the stage returns an error and aborts the pipeline
  • This ensures variable resolution failures are surfaced early

Example providers:

  • Environment variable provider: reads from OS environment
  • Config provider: reads from configuration files
  • External API provider: fetches user context from external services

This is a Transform stage: 1 input element → 1 output element (with enriched metadata)

func NewVariableProviderStage

func NewVariableProviderStage(providers ...variables.Provider) *VariableProviderStage

NewVariableProviderStage creates a variable provider stage.

func (*VariableProviderStage) Process

func (s *VariableProviderStage) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process resolves variables from all providers and merges them into element metadata.

type VideoData

type VideoData struct {
	Data       []byte        // Raw video frame data or encoded video segment
	MIMEType   string        // MIME type (e.g., "video/mp4", "video/webm")
	Width      int           // Frame width in pixels
	Height     int           // Frame height in pixels
	FrameRate  float64       // Frames per second
	Duration   time.Duration // Duration of the video segment
	Timestamp  time.Time     // Timestamp of this frame
	Format     string        // Format identifier (e.g., "h264", "vp8")
	IsKeyFrame bool          // True if this is a key frame
}

VideoData carries video frame data with metadata.

type WeightedRouter

type WeightedRouter struct {
	BaseStage
	// contains filtered or unexported fields
}

WeightedRouter distributes elements across outputs based on configured weights.

func NewWeightedRouter

func NewWeightedRouter(name string, weights map[string]float64) *WeightedRouter

NewWeightedRouter creates a router that distributes elements based on weights. Weights are normalized to sum to 1.0. Example: {"primary": 0.7, "secondary": 0.3} routes 70% to primary, 30% to secondary.

func (*WeightedRouter) Process

func (r *WeightedRouter) Process(
	ctx context.Context,
	input <-chan StreamElement,
	output chan<- StreamElement,
) error

Process distributes elements based on weights.

func (*WeightedRouter) RegisterOutput

func (r *WeightedRouter) RegisterOutput(name string, output chan<- StreamElement)

RegisterOutput registers an output channel with a name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL