Documentation
¶
Overview ¶
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides the reactive streams architecture for pipeline execution.
Package stage provides pipeline stages for audio processing.
Index ¶
- Constants
- Variables
- func BatchEmbeddingTexts(texts []string, batchSize int) [][]string
- func CosineSimilarity(a, b []float32) float64
- func DescribeCapabilities(stage Stage) string
- func GetTraceInfo(elem *StreamElement) (traceID string, stageTimes map[string]time.Time)
- func NormalizeEmbedding(embedding []float32) []float32
- func PutElement(elem *StreamElement)
- func ValidateCapabilities(stages []Stage, edges map[string][]string)
- type AudioCapability
- type AudioData
- type AudioFormat
- type AudioResampleConfig
- type AudioResampleStage
- type AudioTurnConfig
- type AudioTurnStage
- type BaseStage
- type BroadcastRouter
- type ByOriginalIndex
- type Capabilities
- type ContentRouter
- type ContentType
- type ContextBuilderPolicy
- type ContextBuilderStage
- type DebugStage
- type DuplexProviderStage
- type EndInputter
- type ExecutionResult
- type ExecutionTrace
- type FilterStage
- type FormatCapable
- type HashRouter
- type ImageData
- type MapStage
- type MediaExternalizerConfig
- type MediaExternalizerStage
- type MergeStage
- type MetricsStage
- type PassthroughStage
- type PipelineBuilder
- func (b *PipelineBuilder) AddStage(stage Stage) *PipelineBuilder
- func (b *PipelineBuilder) Branch(fromStage string, toStages ...string) *PipelineBuilder
- func (b *PipelineBuilder) Build() (*StreamPipeline, error)
- func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder
- func (b *PipelineBuilder) Clone() *PipelineBuilder
- func (b *PipelineBuilder) Connect(fromStage, toStage string) *PipelineBuilder
- func (b *PipelineBuilder) WithConfig(config *PipelineConfig) *PipelineBuilder
- func (b *PipelineBuilder) WithEventEmitter(emitter *events.Emitter) *PipelineBuilder
- type PipelineConfig
- func (c *PipelineConfig) Validate() error
- func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig
- func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig
- func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig
- func (c *PipelineConfig) WithMaxConcurrentPipelines(maxPipelines int) *PipelineConfig
- func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig
- func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig
- func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig
- type Priority
- type PriorityChannel
- type PromptAssemblyStage
- type ProviderConfig
- type ProviderStage
- type QuerySourceType
- type RandomRouter
- type RecordingPosition
- type RecordingStage
- type RecordingStageConfig
- type RelevanceConfig
- type Response
- type ResponseVADConfig
- type ResponseVADStage
- type RoundRobinRouter
- type RouterFunc
- type RouterStage
- type RoutingRule
- type STTStage
- type STTStageConfig
- type ScoredMessage
- type ScoredMessages
- type Stage
- type StageError
- type StageFunc
- type StageMetrics
- type StageType
- type StateStoreLoadStage
- type StateStoreSaveStage
- type StreamElement
- func GetAudioElement(audio *AudioData) *StreamElement
- func GetElement() *StreamElement
- func GetEndOfStreamElement() *StreamElement
- func GetErrorElement(err error) *StreamElement
- func GetImageElement(image *ImageData) *StreamElement
- func GetMessageElement(msg *types.Message) *StreamElement
- func GetTextElement(text string) *StreamElement
- func GetVideoElement(video *VideoData) *StreamElement
- func NewAudioElement(audio *AudioData) StreamElement
- func NewEndOfStreamElement() StreamElement
- func NewErrorElement(err error) StreamElement
- func NewImageElement(image *ImageData) StreamElement
- func NewMessageElement(msg *types.Message) StreamElement
- func NewTextElement(text string) StreamElement
- func NewVideoElement(video *VideoData) StreamElement
- func (e *StreamElement) GetMetadata(key string) interface{}
- func (e *StreamElement) HasContent() bool
- func (e *StreamElement) IsControl() bool
- func (e *StreamElement) IsEmpty() bool
- func (e *StreamElement) Reset()
- func (e *StreamElement) WithMetadata(key string, value interface{}) *StreamElement
- func (e *StreamElement) WithPriority(priority Priority) *StreamElement
- func (e *StreamElement) WithSequence(seq int64) *StreamElement
- func (e *StreamElement) WithSource(source string) *StreamElement
- type StreamPipeline
- type TTSConfig
- type TTSService
- type TTSStage
- type TTSStageWithInterruption
- type TTSStageWithInterruptionConfig
- type TemplateStage
- type TracingStage
- type Transcriber
- type TruncationStrategy
- type VADAccumulatorStage
- type VADConfig
- type ValidationStage
- type VariableProviderStage
- type VideoData
- type WeightedRouter
Examples ¶
Constants ¶
const ( // DefaultChannelBufferSize is the default buffer size for channels between stages. DefaultChannelBufferSize = 16 // DefaultMaxConcurrentPipelines is the default maximum number of concurrent pipeline executions. DefaultMaxConcurrentPipelines = 100 // DefaultExecutionTimeoutSeconds is the default execution timeout in seconds. DefaultExecutionTimeoutSeconds = 30 // DefaultGracefulShutdownTimeoutSeconds is the default graceful shutdown timeout in seconds. DefaultGracefulShutdownTimeoutSeconds = 10 )
Variables ¶
var ( // ErrPipelineShuttingDown is returned when attempting to execute a pipeline that is shutting down. ErrPipelineShuttingDown = errors.New("pipeline is shutting down") // ErrShutdownTimeout is returned when pipeline shutdown times out. ErrShutdownTimeout = errors.New("shutdown timeout exceeded") // ErrInvalidPipeline is returned when building an invalid pipeline. ErrInvalidPipeline = errors.New("invalid pipeline configuration") // ErrCyclicDependency is returned when the pipeline DAG contains cycles. ErrCyclicDependency = errors.New("cyclic dependency detected in pipeline") // ErrStageNotFound is returned when a referenced stage doesn't exist. ErrStageNotFound = errors.New("stage not found") // ErrDuplicateStageName is returned when multiple stages have the same name. ErrDuplicateStageName = errors.New("duplicate stage name") // ErrNoStages is returned when trying to build a pipeline with no stages. ErrNoStages = errors.New("pipeline must have at least one stage") // ErrInvalidChannelBufferSize is returned for invalid buffer size. ErrInvalidChannelBufferSize = errors.New("channel buffer size must be non-negative") // ErrInvalidMaxConcurrentPipelines is returned for invalid max concurrent pipelines. ErrInvalidMaxConcurrentPipelines = errors.New("max concurrent pipelines must be non-negative") // ErrInvalidExecutionTimeout is returned for invalid execution timeout. ErrInvalidExecutionTimeout = errors.New("execution timeout must be non-negative") // ErrInvalidGracefulShutdownTimeout is returned for invalid graceful shutdown timeout. ErrInvalidGracefulShutdownTimeout = errors.New("graceful shutdown timeout must be non-negative") )
Common errors
Functions ¶
func BatchEmbeddingTexts ¶
BatchEmbeddingTexts splits texts into batches of the given size. Useful for respecting embedding provider batch limits.
func CosineSimilarity ¶
CosineSimilarity computes the cosine similarity between two embedding vectors. Returns a value between -1.0 and 1.0, where:
- 1.0 means vectors are identical in direction
- 0.0 means vectors are orthogonal (unrelated)
- -1.0 means vectors are opposite
For text embeddings, values typically range from 0.0 to 1.0, with higher values indicating greater semantic similarity.
Returns 0.0 if vectors have different lengths, are empty, or have zero magnitude.
func DescribeCapabilities ¶
DescribeCapabilities returns a human-readable description of a stage's capabilities. Useful for debugging and logging.
func GetTraceInfo ¶
func GetTraceInfo(elem *StreamElement) (traceID string, stageTimes map[string]time.Time)
GetTraceInfo extracts trace information from an element.
func NormalizeEmbedding ¶
NormalizeEmbedding normalizes an embedding vector to unit length. This can improve similarity comparisons by ensuring all vectors have the same magnitude.
func PutElement ¶
func PutElement(elem *StreamElement)
PutElement returns a StreamElement to the pool for reuse. The element is reset before being returned to the pool to prevent data leaks. After calling PutElement, the caller must not use the element again.
func ValidateCapabilities ¶
ValidateCapabilities checks format compatibility between connected stages. It logs warnings for potential mismatches but does not return errors, as format compatibility can often only be fully determined at runtime.
This function is called during pipeline building to provide early feedback about potential issues.
Types ¶
type AudioCapability ¶
type AudioCapability struct {
// Formats lists accepted audio formats. Empty slice means any format.
Formats []AudioFormat
// SampleRates lists accepted sample rates in Hz. Empty slice means any rate.
SampleRates []int
// Channels lists accepted channel counts. Empty slice means any.
Channels []int
}
AudioCapability describes audio format requirements for a stage.
func (*AudioCapability) AcceptsAudio ¶
func (ac *AudioCapability) AcceptsAudio(audio *AudioData) bool
AcceptsAudio returns true if this capability accepts the given audio data.
func (*AudioCapability) AcceptsChannels ¶
func (ac *AudioCapability) AcceptsChannels(channels int) bool
AcceptsChannels returns true if this capability accepts the given channel count. Returns true if Channels is empty (accepts any).
func (*AudioCapability) AcceptsFormat ¶
func (ac *AudioCapability) AcceptsFormat(format AudioFormat) bool
AcceptsFormat returns true if this capability accepts the given format. Returns true if Formats is empty (accepts any).
func (*AudioCapability) AcceptsSampleRate ¶
func (ac *AudioCapability) AcceptsSampleRate(rate int) bool
AcceptsSampleRate returns true if this capability accepts the given sample rate. Returns true if SampleRates is empty (accepts any).
type AudioData ¶
type AudioData struct {
Samples []byte // Raw audio samples
SampleRate int // Sample rate in Hz (e.g., 16000, 44100)
Channels int // Number of audio channels (1=mono, 2=stereo)
Format AudioFormat // Audio encoding format
Duration time.Duration // Duration of the audio segment
Encoding string // Encoding scheme (e.g., "pcm", "opus")
}
AudioData carries audio samples with metadata.
type AudioFormat ¶
type AudioFormat int
AudioFormat represents the encoding format of audio data.
const ( // AudioFormatPCM16 is 16-bit PCM encoding AudioFormatPCM16 AudioFormat = iota // AudioFormatFloat32 is 32-bit floating point encoding AudioFormatFloat32 // AudioFormatOpus is Opus codec encoding AudioFormatOpus // AudioFormatMP3 is MP3 encoding AudioFormatMP3 // AudioFormatAAC is AAC encoding AudioFormatAAC )
func (AudioFormat) String ¶
func (af AudioFormat) String() string
String returns the string representation of the audio format.
type AudioResampleConfig ¶
type AudioResampleConfig struct {
// TargetSampleRate is the desired output sample rate in Hz.
// Common values: 16000 (Gemini), 24000 (OpenAI TTS), 44100 (CD quality).
TargetSampleRate int
// PassthroughIfSameRate skips resampling if input rate matches target rate.
// Default: true.
PassthroughIfSameRate bool
}
AudioResampleConfig contains configuration for the audio resampling stage.
func DefaultAudioResampleConfig ¶
func DefaultAudioResampleConfig() AudioResampleConfig
DefaultAudioResampleConfig returns sensible defaults for audio resampling.
type AudioResampleStage ¶
type AudioResampleStage struct {
BaseStage
// contains filtered or unexported fields
}
AudioResampleStage resamples audio data to a target sample rate. This is useful for normalizing audio from different sources (TTS, files) to match provider requirements.
This is a Transform stage: audio element → resampled audio element (1:1)
func NewAudioResampleStage ¶
func NewAudioResampleStage(config AudioResampleConfig) *AudioResampleStage
NewAudioResampleStage creates a new audio resampling stage.
func (*AudioResampleStage) GetConfig ¶
func (s *AudioResampleStage) GetConfig() AudioResampleConfig
GetConfig returns the stage configuration.
func (*AudioResampleStage) Process ¶
func (s *AudioResampleStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Resamples audio in each element to the target sample rate.
type AudioTurnConfig ¶
type AudioTurnConfig struct {
// VAD is the voice activity detector.
// If nil, a SimpleVAD with default params is created.
VAD audio.VADAnalyzer
// TurnDetector determines when user has finished speaking.
// If nil, VAD state transitions are used for turn detection.
TurnDetector audio.TurnDetector
// InterruptionHandler detects when user interrupts TTS output.
// This should be shared with TTSStageWithInterruption.
// If nil, interruption detection is disabled.
InterruptionHandler *audio.InterruptionHandler
// SilenceDuration is how long silence must persist to trigger turn complete.
// Default: 800ms
SilenceDuration time.Duration
// MinSpeechDuration is minimum speech before turn can complete.
// Default: 200ms
MinSpeechDuration time.Duration
// MaxTurnDuration is maximum turn length before forcing completion.
// Default: 30s
MaxTurnDuration time.Duration
// SampleRate is the audio sample rate for output AudioData.
// Default: 16000
SampleRate int
}
AudioTurnConfig configures the AudioTurnStage.
func DefaultAudioTurnConfig ¶
func DefaultAudioTurnConfig() AudioTurnConfig
DefaultAudioTurnConfig returns sensible defaults for AudioTurnStage.
type AudioTurnStage ¶
type AudioTurnStage struct {
BaseStage
// contains filtered or unexported fields
}
AudioTurnStage detects voice activity and accumulates audio into complete turns. It outputs complete audio utterances when the user stops speaking.
This stage consolidates: - Voice Activity Detection (VAD) - Turn boundary detection - Audio accumulation - Interruption detection (shared with TTSStageWithInterruption)
This is an Accumulate stage: N audio chunks → 1 audio utterance
func NewAudioTurnStage ¶
func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)
NewAudioTurnStage creates a new audio turn stage.
func (*AudioTurnStage) Process ¶
func (s *AudioTurnStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Accumulates audio chunks until turn complete, then emits audio utterance.
type BaseStage ¶
type BaseStage struct {
// contains filtered or unexported fields
}
BaseStage provides common functionality for stage implementations. Stages can embed this to reduce boilerplate.
func NewBaseStage ¶
NewBaseStage creates a new BaseStage with the given name and type.
type BroadcastRouter ¶
type BroadcastRouter struct {
BaseStage
// contains filtered or unexported fields
}
BroadcastRouter sends each element to ALL registered outputs. Useful for fan-out scenarios where all consumers need every element.
func NewBroadcastRouter ¶
func NewBroadcastRouter(name string) *BroadcastRouter
NewBroadcastRouter creates a router that broadcasts to all outputs.
func (*BroadcastRouter) Process ¶
func (r *BroadcastRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process broadcasts each element to all outputs.
func (*BroadcastRouter) RegisterOutput ¶
func (r *BroadcastRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name.
type ByOriginalIndex ¶
type ByOriginalIndex []ScoredMessage
ByOriginalIndex sorts ScoredMessages by their original index (ascending).
func (ByOriginalIndex) Len ¶
func (s ByOriginalIndex) Len() int
func (ByOriginalIndex) Less ¶
func (s ByOriginalIndex) Less(i, j int) bool
func (ByOriginalIndex) Swap ¶
func (s ByOriginalIndex) Swap(i, j int)
type Capabilities ¶
type Capabilities struct {
// ContentTypes lists the content types handled. Empty means any.
ContentTypes []ContentType
// Audio specifies audio-specific requirements. Nil means N/A or any.
Audio *AudioCapability
}
Capabilities describes what a stage accepts or produces.
func AnyCapabilities ¶
func AnyCapabilities() Capabilities
AnyCapabilities returns capabilities that accept any content type.
func AudioCapabilities ¶
func AudioCapabilities(formats []AudioFormat, sampleRates, channels []int) Capabilities
AudioCapabilities returns capabilities for audio content with optional format constraints.
func MessageCapabilities ¶
func MessageCapabilities() Capabilities
MessageCapabilities returns capabilities for message content.
func TextCapabilities ¶
func TextCapabilities() Capabilities
TextCapabilities returns capabilities for text-only content.
func (*Capabilities) AcceptsContentType ¶
func (c *Capabilities) AcceptsContentType(ct ContentType) bool
AcceptsContentType returns true if this capability accepts the given content type. Returns true if ContentTypes is empty (accepts any).
func (*Capabilities) AcceptsElement ¶
func (c *Capabilities) AcceptsElement(elem *StreamElement) bool
AcceptsElement returns true if this capability accepts the given stream element.
type ContentRouter ¶
type ContentRouter struct {
BaseStage
// contains filtered or unexported fields
}
ContentRouter routes elements to different outputs based on predicate rules. Rules are evaluated in order; the first matching rule determines the destination. Elements that don't match any rule are dropped with a warning log.
func NewContentRouter ¶
func NewContentRouter(name string, rules ...RoutingRule) *ContentRouter
NewContentRouter creates a new content-aware router with the given rules.
func (*ContentRouter) Process ¶
func (r *ContentRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process routes elements based on the configured rules.
func (*ContentRouter) RegisterOutput ¶
func (r *ContentRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name. This must be called before Process() to set up routing destinations.
type ContentType ¶
type ContentType int
ContentType describes the type of content a stage handles.
const ( // ContentTypeAny indicates the stage accepts any content type. ContentTypeAny ContentType = iota // ContentTypeText indicates text content. ContentTypeText // ContentTypeAudio indicates audio content. ContentTypeAudio // ContentTypeVideo indicates video content. ContentTypeVideo // ContentTypeImage indicates image content. ContentTypeImage // ContentTypeMessage indicates a complete message. ContentTypeMessage // ContentTypeToolCall indicates a tool invocation. ContentTypeToolCall )
func (ContentType) String ¶
func (ct ContentType) String() string
String returns the string representation of the content type.
type ContextBuilderPolicy ¶
type ContextBuilderPolicy struct {
TokenBudget int
ReserveForOutput int
Strategy TruncationStrategy
CacheBreakpoints bool
// RelevanceConfig for TruncateLeastRelevant strategy (optional).
// If nil when using TruncateLeastRelevant, falls back to TruncateOldest.
RelevanceConfig *RelevanceConfig
}
ContextBuilderPolicy defines token budget and truncation behavior.
type ContextBuilderStage ¶
type ContextBuilderStage struct {
BaseStage
// contains filtered or unexported fields
}
ContextBuilderStage manages token budget and truncates messages if needed.
This stage ensures the conversation context fits within the LLM's token budget by applying truncation strategies when messages exceed the limit.
Token budget calculation:
available = TokenBudget - ReserveForOutput - systemPromptTokens
Truncation strategies (TruncationStrategy):
- TruncateOldest: removes oldest messages first (keeps most recent context)
- TruncateLeastRelevant: removes least relevant messages (requires embeddings) [TODO]
- TruncateSummarize: compresses old messages into summaries [TODO]
- TruncateFail: returns error if budget exceeded (strict mode)
Configuration (ContextBuilderPolicy):
- TokenBudget: total tokens allowed (0 = unlimited, pass-through mode)
- ReserveForOutput: tokens reserved for LLM response
- Strategy: truncation strategy to apply
- CacheBreakpoints: enable prompt caching hints
Metadata added:
- context_truncated: true if truncation was applied
- enable_cache_breakpoints: copied from policy.CacheBreakpoints
This is an Accumulate stage: N input elements → N (possibly fewer) output elements
func NewContextBuilderStage ¶
func NewContextBuilderStage(policy *ContextBuilderPolicy) *ContextBuilderStage
NewContextBuilderStage creates a context builder stage.
func (*ContextBuilderStage) Process ¶
func (s *ContextBuilderStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process enforces token budget and truncates messages if needed.
type DebugStage ¶
type DebugStage struct {
BaseStage
// contains filtered or unexported fields
}
DebugStage logs StreamElements for debugging pipeline state. Useful for development and troubleshooting.
func NewDebugStage ¶
func NewDebugStage(stageName string) *DebugStage
NewDebugStage creates a debug stage that logs elements at a specific pipeline location.
func (*DebugStage) Process ¶
func (s *DebugStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process logs each element as it passes through (passthrough transform).
type DuplexProviderStage ¶
type DuplexProviderStage struct {
BaseStage
// contains filtered or unexported fields
}
DuplexProviderStage handles bidirectional streaming through a session. It forwards elements from input to the provider's session and forwards responses from the session to output.
This stage is PROVIDER-AGNOSTIC. Provider-specific behaviors (interruptions, reconnection, protocol quirks) are handled BY the provider internally.
System Prompt Handling: The first element received may contain a system prompt in metadata["system_prompt"]. This is sent to the session via SendSystemContext() before processing audio/text.
Response Accumulation: Streaming providers send text/audio responses in chunks. This stage accumulates content across chunks and creates a Message on turn completion (FinishReason).
Session Closure: When the session closes unexpectedly, any accumulated content is emitted as a partial response. The executor is responsible for session recreation if needed.
This is a Bidirectional stage: input elements ⟷ session ⟷ output elements
func NewDuplexProviderStage ¶
func NewDuplexProviderStage( provider providers.StreamInputSupport, baseConfig *providers.StreamingInputConfig, ) *DuplexProviderStage
NewDuplexProviderStage creates a new duplex provider stage. The session is created lazily when the first element arrives, using system_prompt from element metadata. This allows the pipeline to be the single source of truth for prompt assembly.
func NewDuplexProviderStageWithEmitter ¶
func NewDuplexProviderStageWithEmitter( provider providers.StreamInputSupport, baseConfig *providers.StreamingInputConfig, emitter *events.Emitter, ) *DuplexProviderStage
NewDuplexProviderStageWithEmitter creates a new duplex provider stage with event emission support. The emitter is used to emit audio.input and audio.output events for session recording.
func (*DuplexProviderStage) Process ¶
func (s *DuplexProviderStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Handles bidirectional streaming between input channel and WebSocket session.
For duplex streaming (Gemini Live API), this runs until: - Context is canceled (user stops the session) - Session response channel is closed (server ends session) - Input channel is closed (upstream ends)
If no session is pre-configured, the session is created lazily when the first element arrives. The system_prompt from element metadata is used as the SystemInstruction for session creation.
type EndInputter ¶
type EndInputter interface {
EndInput()
}
EndInputter is an optional interface for sessions that support explicit end-of-input signaling. This is primarily used by mock sessions to trigger responses after all audio has been sent.
type ExecutionResult ¶
type ExecutionResult struct {
Messages []types.Message // All messages in the conversation
Response *Response // The final response
Trace ExecutionTrace // Execution trace
CostInfo types.CostInfo // Cost information
Metadata map[string]interface{} // Additional metadata
}
ExecutionResult represents the final result of a pipeline execution. This matches the existing pipeline.ExecutionResult for compatibility.
type ExecutionTrace ¶
ExecutionTrace captures execution history (for compatibility).
type FilterStage ¶
type FilterStage struct {
BaseStage
// contains filtered or unexported fields
}
FilterStage filters elements based on a predicate function.
func NewFilterStage ¶
func NewFilterStage(name string, predicate func(StreamElement) bool) *FilterStage
NewFilterStage creates a new filter stage.
func (*FilterStage) Process ¶
func (fs *FilterStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
Process filters elements based on the predicate.
type FormatCapable ¶
type FormatCapable interface {
// InputCapabilities returns what formats/content types this stage accepts.
InputCapabilities() Capabilities
// OutputCapabilities returns what formats/content types this stage produces.
OutputCapabilities() Capabilities
}
FormatCapable is an optional interface that stages can implement to declare their input/output format requirements. Stages that don't implement this are treated as accepting/producing any format.
type HashRouter ¶
type HashRouter struct {
BaseStage
// contains filtered or unexported fields
}
HashRouter routes elements based on consistent hashing of a key. This ensures elements with the same key always go to the same destination.
func NewHashRouter ¶
func NewHashRouter(name string, outputNames []string, keyFunc func(StreamElement) string) *HashRouter
NewHashRouter creates a router that uses consistent hashing. The keyFunc extracts a key from each element (e.g., session ID). Elements with the same key always route to the same destination.
func (*HashRouter) Process ¶
func (r *HashRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process routes elements based on hash of key.
func (*HashRouter) RegisterOutput ¶
func (r *HashRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name.
type ImageData ¶
type ImageData struct {
Data []byte // Raw image data (encoded as JPEG, PNG, etc.)
MIMEType string // MIME type (e.g., "image/jpeg", "image/png")
Width int // Image width in pixels
Height int // Image height in pixels
Format string // Format identifier (e.g., "jpeg", "png", "webp")
}
ImageData carries image data with metadata.
type MapStage ¶
type MapStage struct {
BaseStage
// contains filtered or unexported fields
}
MapStage transforms elements using a mapping function.
Example ¶
ExampleMapStage demonstrates using a map stage to transform elements.
package main
import (
"context"
"fmt"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
"github.com/AltairaLabs/PromptKit/runtime/types"
)
func main() {
// Create a map stage that uppercases text
uppercaseStage := stage.NewMapStage("uppercase", func(elem stage.StreamElement) (stage.StreamElement, error) {
if elem.Message != nil {
msg := *elem.Message
msg.Content = "TRANSFORMED: " + msg.Content
elem.Message = &msg
}
return elem, nil
})
// Build pipeline
pipeline, _ := stage.NewPipelineBuilder().
Chain(uppercaseStage).
Build()
// Execute
input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(&types.Message{
Role: "user",
Content: "hello",
})
close(input)
output, _ := pipeline.Execute(context.Background(), input)
for elem := range output {
if elem.Message != nil {
fmt.Printf("%s\n", elem.Message.Content)
}
}
}
Output: TRANSFORMED: hello
func NewMapStage ¶
func NewMapStage(name string, mapFunc func(StreamElement) (StreamElement, error)) *MapStage
NewMapStage creates a new map stage.
func (*MapStage) Process ¶
func (ms *MapStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
Process transforms each element using the map function.
type MediaExternalizerConfig ¶
type MediaExternalizerConfig struct {
Enabled bool
StorageService storage.MediaStorageService
SizeThresholdKB int64
DefaultPolicy string
RunID string
SessionID string
ConversationID string
}
MediaExternalizerConfig configures media externalization behavior.
type MediaExternalizerStage ¶
type MediaExternalizerStage struct {
BaseStage
// contains filtered or unexported fields
}
MediaExternalizerStage externalizes large media content to external storage.
When messages contain large inline media (images, audio, video), this stage moves the data to external storage and replaces it with a storage reference. This reduces memory usage and allows for media lifecycle management.
Behavior:
- Skipped if Enabled=false or no StorageService configured
- Only externalizes media exceeding SizeThresholdKB (base64 size)
- Preserves media.StorageReference if already externalized
- Clears media.Data after successful externalization
Configuration:
- Enabled: master switch for externalization
- SizeThresholdKB: minimum size to externalize (0 = externalize all)
- StorageService: where to store media (S3, GCS, local filesystem, etc.)
- DefaultPolicy: retention policy name for stored media
This is a Transform stage: 1 input element → 1 output element (with externalized media)
func NewMediaExternalizerStage ¶
func NewMediaExternalizerStage(config *MediaExternalizerConfig) *MediaExternalizerStage
NewMediaExternalizerStage creates a media externalizer stage.
func (*MediaExternalizerStage) Process ¶
func (s *MediaExternalizerStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process externalizes media from messages if they exceed size threshold.
type MergeStage ¶
type MergeStage struct {
BaseStage
// contains filtered or unexported fields
}
MergeStage merges multiple input channels into a single output channel. This enables fan-in patterns where multiple stages feed into one.
This is an Accumulate stage type that handles multiple inputs (N:1 merge).
func NewMergeStage ¶
func NewMergeStage(name string, inputCount int) *MergeStage
NewMergeStage creates a new merge stage that merges N inputs into 1 output.
func (*MergeStage) Process ¶
func (s *MergeStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface (single input). For merge stage, this is not typically used - use ProcessMultiple instead.
func (*MergeStage) ProcessMultiple ¶
func (s *MergeStage) ProcessMultiple( ctx context.Context, inputs []<-chan StreamElement, output chan<- StreamElement, ) error
ProcessMultiple processes multiple input channels and merges them into one output. This is a special method for merge stages that differs from the standard Process signature.
type MetricsStage ¶
type MetricsStage struct {
BaseStage
// contains filtered or unexported fields
}
MetricsStage wraps another stage and collects metrics about its performance. This is a transparent wrapper that doesn't modify element flow.
func NewMetricsStage ¶
func NewMetricsStage(wrappedStage Stage) *MetricsStage
NewMetricsStage wraps a stage with metrics collection.
func (*MetricsStage) GetMetrics ¶
func (s *MetricsStage) GetMetrics() StageMetrics
GetMetrics returns the collected metrics.
func (*MetricsStage) Process ¶
func (s *MetricsStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface with metrics collection.
type PassthroughStage ¶
type PassthroughStage struct {
BaseStage
}
PassthroughStage is a simple stage that passes all elements through unchanged. Useful for testing or as a placeholder.
func NewPassthroughStage ¶
func NewPassthroughStage(name string) *PassthroughStage
NewPassthroughStage creates a new passthrough stage.
func (*PassthroughStage) Process ¶
func (ps *PassthroughStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
Process passes all elements through unchanged.
type PipelineBuilder ¶
type PipelineBuilder struct {
// contains filtered or unexported fields
}
PipelineBuilder constructs a pipeline DAG. It provides methods for creating linear chains and branching topologies.
Example ¶
ExamplePipelineBuilder demonstrates building a simple linear pipeline.
package main
import (
"context"
"fmt"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
"github.com/AltairaLabs/PromptKit/runtime/types"
)
func main() {
// Create some simple stages
inputStage := stage.NewPassthroughStage("input")
processStage := stage.NewPassthroughStage("process")
outputStage := stage.NewPassthroughStage("output")
// Build a linear pipeline
pipeline, err := stage.NewPipelineBuilder().
Chain(inputStage, processStage, outputStage).
Build()
if err != nil {
fmt.Printf("Error building pipeline: %v\n", err)
return
}
// Create input channel with a message
input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(&types.Message{
Role: "user",
Content: "Hello, world!",
})
close(input)
// Execute pipeline
ctx := context.Background()
output, err := pipeline.Execute(ctx, input)
if err != nil {
fmt.Printf("Error executing pipeline: %v\n", err)
return
}
// Consume output
for elem := range output {
if elem.Message != nil {
fmt.Printf("Received message: %s\n", elem.Message.Content)
}
}
}
Output: Received message: Hello, world!
Example (WithConfig) ¶
ExamplePipelineBuilder_withConfig demonstrates building a pipeline with custom configuration.
package main
import (
"fmt"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
)
func main() {
// Create custom config
config := stage.DefaultPipelineConfig().
WithChannelBufferSize(32).
WithPriorityQueue(true).
WithMetrics(true)
// Build pipeline with config
pipeline, err := stage.NewPipelineBuilderWithConfig(config).
Chain(
stage.NewPassthroughStage("stage1"),
stage.NewPassthroughStage("stage2"),
).
Build()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Pipeline created with %d stages\n", 2)
_ = pipeline
}
Output: Pipeline created with 2 stages
func NewPipelineBuilder ¶
func NewPipelineBuilder() *PipelineBuilder
NewPipelineBuilder creates a new PipelineBuilder with default configuration.
func NewPipelineBuilderWithConfig ¶
func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilder
NewPipelineBuilderWithConfig creates a new PipelineBuilder with custom configuration.
func (*PipelineBuilder) AddStage ¶
func (b *PipelineBuilder) AddStage(stage Stage) *PipelineBuilder
AddStage adds a stage to the builder without connecting it. This is useful when building complex topologies manually.
func (*PipelineBuilder) Branch ¶
func (b *PipelineBuilder) Branch(fromStage string, toStages ...string) *PipelineBuilder
Branch creates multiple outgoing connections from a single stage. This allows one stage's output to fan out to multiple downstream stages.
Example:
pipeline := NewPipelineBuilder().
Chain(NewStageA(), NewStageB()).
Branch("stageB", "stageC", "stageD"). // B's output goes to both C and D
Build()
func (*PipelineBuilder) Build ¶
func (b *PipelineBuilder) Build() (*StreamPipeline, error)
Build constructs the pipeline from the builder's configuration. It validates the pipeline structure and returns an error if invalid.
func (*PipelineBuilder) Chain ¶
func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder
Chain creates a linear chain of stages. This is the most common pattern: stage1 -> stage2 -> stage3. Each stage's output is connected to the next stage's input.
Example:
pipeline := NewPipelineBuilder().
Chain(
NewStageA(),
NewStageB(),
NewStageC(),
).
Build()
func (*PipelineBuilder) Clone ¶
func (b *PipelineBuilder) Clone() *PipelineBuilder
Clone creates a deep copy of the builder.
func (*PipelineBuilder) Connect ¶
func (b *PipelineBuilder) Connect(fromStage, toStage string) *PipelineBuilder
Connect creates a directed edge from one stage to another. The output of fromStage will be connected to the input of toStage.
func (*PipelineBuilder) WithConfig ¶
func (b *PipelineBuilder) WithConfig(config *PipelineConfig) *PipelineBuilder
WithConfig sets the pipeline configuration.
func (*PipelineBuilder) WithEventEmitter ¶
func (b *PipelineBuilder) WithEventEmitter(emitter *events.Emitter) *PipelineBuilder
WithEventEmitter sets the event emitter for the pipeline.
type PipelineConfig ¶
type PipelineConfig struct {
// ChannelBufferSize controls buffering between stages.
// Smaller values = lower latency but more backpressure.
// Larger values = higher throughput but more memory usage.
// Default: 16
ChannelBufferSize int
// PriorityQueueEnabled enables priority-based scheduling.
// When enabled, high-priority elements (audio) are processed before low-priority (logs).
// Default: false
PriorityQueueEnabled bool
// MaxConcurrentPipelines limits the number of concurrent pipeline executions.
// This is used by PipelinePool to control concurrency.
// Default: 100
MaxConcurrentPipelines int
// ExecutionTimeout sets the maximum duration for a single pipeline execution.
// Set to 0 to disable timeout.
// Default: 30 seconds
ExecutionTimeout time.Duration
// GracefulShutdownTimeout sets the maximum time to wait for in-flight executions during shutdown.
// Default: 10 seconds
GracefulShutdownTimeout time.Duration
// EnableMetrics enables collection of per-stage metrics (latency, throughput, etc.).
// Default: false
EnableMetrics bool
// EnableTracing enables detailed tracing of element flow through stages.
// Default: false (can be expensive for high-throughput pipelines)
EnableTracing bool
}
PipelineConfig defines configuration options for pipeline execution.
func DefaultPipelineConfig ¶
func DefaultPipelineConfig() *PipelineConfig
DefaultPipelineConfig returns a PipelineConfig with sensible defaults.
func (*PipelineConfig) Validate ¶
func (c *PipelineConfig) Validate() error
Validate checks if the configuration is valid.
func (*PipelineConfig) WithChannelBufferSize ¶
func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig
WithChannelBufferSize sets the channel buffer size.
func (*PipelineConfig) WithExecutionTimeout ¶
func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig
WithExecutionTimeout sets the execution timeout.
func (*PipelineConfig) WithGracefulShutdownTimeout ¶
func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig
WithGracefulShutdownTimeout sets the graceful shutdown timeout.
func (*PipelineConfig) WithMaxConcurrentPipelines ¶
func (c *PipelineConfig) WithMaxConcurrentPipelines(maxPipelines int) *PipelineConfig
WithMaxConcurrentPipelines sets the maximum number of concurrent pipeline executions.
func (*PipelineConfig) WithMetrics ¶
func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig
WithMetrics enables or disables metrics collection.
func (*PipelineConfig) WithPriorityQueue ¶
func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig
WithPriorityQueue enables or disables priority-based scheduling.
func (*PipelineConfig) WithTracing ¶
func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig
WithTracing enables or disables detailed tracing.
type Priority ¶
type Priority int
Priority defines the scheduling priority for stream elements. Higher priority elements are processed before lower priority ones.
const ( // PriorityLow is for non-critical data like logs or metrics PriorityLow Priority = iota // PriorityNormal is the default priority for most elements PriorityNormal // PriorityHigh is for real-time audio/video that requires low latency PriorityHigh // PriorityCritical is for control signals, errors, and system messages PriorityCritical )
type PriorityChannel ¶
type PriorityChannel struct {
// contains filtered or unexported fields
}
PriorityChannel is a channel that supports priority-based element delivery. Higher priority elements are delivered before lower priority elements.
func NewPriorityChannel ¶
func NewPriorityChannel(capacity int) *PriorityChannel
NewPriorityChannel creates a new priority channel with the given capacity.
func (*PriorityChannel) Close ¶
func (pc *PriorityChannel) Close()
Close closes the priority channel.
func (*PriorityChannel) Len ¶
func (pc *PriorityChannel) Len() int
Len returns the current number of elements in the channel.
func (*PriorityChannel) Receive ¶
func (pc *PriorityChannel) Receive(ctx context.Context) (StreamElement, bool, error)
Receive receives the highest priority element from the channel. Blocks if the channel is empty.
func (*PriorityChannel) Send ¶
func (pc *PriorityChannel) Send(ctx context.Context, elem StreamElement) error
Send sends an element to the priority channel. Blocks if the channel is at capacity.
type PromptAssemblyStage ¶
type PromptAssemblyStage struct {
BaseStage
// contains filtered or unexported fields
}
PromptAssemblyStage loads and assembles prompts from the prompt registry. It enriches elements with system prompt, allowed tools, and variables.
func NewPromptAssemblyStage ¶
func NewPromptAssemblyStage( promptRegistry *prompt.Registry, taskType string, baseVariables map[string]string, ) *PromptAssemblyStage
NewPromptAssemblyStage creates a new prompt assembly stage.
func (*PromptAssemblyStage) Process ¶
func (s *PromptAssemblyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
Process loads and assembles the prompt, enriching elements with prompt data.
type ProviderConfig ¶
ProviderConfig contains configuration for the provider stage.
type ProviderStage ¶
type ProviderStage struct {
BaseStage
// contains filtered or unexported fields
}
ProviderStage executes LLM calls and handles tool execution. This is the request/response mode implementation.
func NewProviderStage ¶
func NewProviderStage( provider providers.Provider, toolRegistry *tools.Registry, toolPolicy *pipeline.ToolPolicy, config *ProviderConfig, ) *ProviderStage
NewProviderStage creates a new provider stage for request/response mode.
func (*ProviderStage) Process ¶
func (s *ProviderStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process executes the LLM provider call and handles tool execution.
type QuerySourceType ¶
type QuerySourceType string
QuerySourceType defines how to construct the relevance query.
const ( // QuerySourceLastUser uses the last user message as the query QuerySourceLastUser QuerySourceType = "last_user" // QuerySourceLastN concatenates the last N messages as the query QuerySourceLastN QuerySourceType = "last_n" // QuerySourceCustom uses a custom query string QuerySourceCustom QuerySourceType = "custom" )
type RandomRouter ¶
type RandomRouter struct {
BaseStage
// contains filtered or unexported fields
}
RandomRouter distributes elements randomly across outputs.
func NewRandomRouter ¶
func NewRandomRouter(name string, outputNames []string) *RandomRouter
NewRandomRouter creates a router that distributes elements randomly.
func (*RandomRouter) Process ¶
func (r *RandomRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process distributes elements randomly.
func (*RandomRouter) RegisterOutput ¶
func (r *RandomRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name.
type RecordingPosition ¶
type RecordingPosition string
RecordingPosition indicates where in the pipeline the recording stage is placed.
const ( // RecordingPositionInput records elements entering the pipeline (user input). RecordingPositionInput RecordingPosition = "input" // RecordingPositionOutput records elements leaving the pipeline (agent output). RecordingPositionOutput RecordingPosition = "output" )
type RecordingStage ¶
type RecordingStage struct {
BaseStage
// contains filtered or unexported fields
}
RecordingStage captures pipeline elements as events for session recording. It observes elements flowing through without modifying them.
func NewRecordingStage ¶
func NewRecordingStage(eventBus *events.EventBus, config RecordingStageConfig) *RecordingStage
NewRecordingStage creates a new recording stage.
func (*RecordingStage) Process ¶
func (rs *RecordingStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process observes elements and records them as events.
func (*RecordingStage) WithConversationID ¶
func (rs *RecordingStage) WithConversationID(conversationID string) *RecordingStage
WithConversationID sets the conversation ID for recorded events.
func (*RecordingStage) WithSessionID ¶
func (rs *RecordingStage) WithSessionID(sessionID string) *RecordingStage
WithSessionID sets the session ID for recorded events.
type RecordingStageConfig ¶
type RecordingStageConfig struct {
// Position indicates where this stage is in the pipeline.
Position RecordingPosition
// SessionID is the session identifier for recorded events.
SessionID string
// ConversationID groups events within a session.
ConversationID string
// IncludeAudio records audio data (may be large).
IncludeAudio bool
// IncludeVideo records video data (may be large).
IncludeVideo bool
// IncludeImages records image data.
IncludeImages bool
}
RecordingStageConfig configures the recording stage behavior.
func DefaultRecordingStageConfig ¶
func DefaultRecordingStageConfig() RecordingStageConfig
DefaultRecordingStageConfig returns sensible defaults.
type RelevanceConfig ¶
type RelevanceConfig struct {
// EmbeddingProvider generates embeddings for similarity scoring.
// Required for relevance-based truncation; if nil, falls back to oldest.
EmbeddingProvider providers.EmbeddingProvider
// MinRecentMessages always keeps the N most recent messages
// regardless of relevance score. Default: 3
MinRecentMessages int
// AlwaysKeepSystemRole keeps all system role messages regardless of score.
AlwaysKeepSystemRole bool
// SimilarityThreshold is the minimum score to consider a message relevant (0.0-1.0).
// Messages below this threshold may be dropped first.
SimilarityThreshold float64
// QuerySource determines what text to compare messages against.
// Default: QuerySourceLastUser
QuerySource QuerySourceType
// LastNCount is the number of messages to use when QuerySource is QuerySourceLastN.
// Default: 3
LastNCount int
// CustomQuery is the query text when QuerySource is QuerySourceCustom.
CustomQuery string
// CacheEmbeddings enables caching of embeddings across truncation calls.
// Useful when context changes incrementally.
CacheEmbeddings bool
}
RelevanceConfig configures embedding-based relevance truncation. Used when TruncationStrategy is TruncateLeastRelevant.
type Response ¶
type Response struct {
Role string
Content string
Parts []types.ContentPart
ToolCalls []types.MessageToolCall
FinalResponse string
}
Response represents a response message (for compatibility with existing pipeline).
type ResponseVADConfig ¶
type ResponseVADConfig struct {
// VAD is the voice activity detector.
// If nil, a SimpleVAD with default params is created.
VAD audio.VADAnalyzer
// SilenceDuration is how long silence must persist after EndOfStream
// to confirm turn completion.
// Default: 500ms
SilenceDuration time.Duration
// MaxWaitDuration is the maximum time to wait for silence after EndOfStream.
// If silence is not detected within this time, EndOfStream is emitted anyway.
// Default: 3s
MaxWaitDuration time.Duration
// SampleRate is the expected audio sample rate.
// Default: 24000 (Gemini output)
SampleRate int
}
ResponseVADConfig configures the ResponseVADStage.
func DefaultResponseVADConfig ¶
func DefaultResponseVADConfig() ResponseVADConfig
DefaultResponseVADConfig returns sensible defaults for ResponseVADStage.
type ResponseVADStage ¶
type ResponseVADStage struct {
BaseStage
// contains filtered or unexported fields
}
ResponseVADStage monitors response audio for silence and delays EndOfStream until actual silence is detected. This decouples turn completion from provider signaling (e.g., Gemini's turnComplete) which may arrive before all audio chunks have been received.
This stage: 1. Passes through all elements immediately (audio, text, messages) 2. When EndOfStream is received from upstream, starts monitoring for silence 3. Only emits EndOfStream downstream when VAD confirms sustained silence 4. Has a max wait timeout to prevent indefinite blocking
This is a Transform stage with buffering: it may hold EndOfStream temporarily.
func NewResponseVADStage ¶
func NewResponseVADStage(config ResponseVADConfig) (*ResponseVADStage, error)
NewResponseVADStage creates a new response VAD stage.
func (*ResponseVADStage) Process ¶
func (s *ResponseVADStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Monitors response audio for silence and delays EndOfStream until confirmed.
type RoundRobinRouter ¶
type RoundRobinRouter struct {
BaseStage
// contains filtered or unexported fields
}
RoundRobinRouter distributes elements across outputs in sequence.
func NewRoundRobinRouter ¶
func NewRoundRobinRouter(name string, outputNames []string) *RoundRobinRouter
NewRoundRobinRouter creates a router that cycles through outputs sequentially.
func (*RoundRobinRouter) Process ¶
func (r *RoundRobinRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process distributes elements in round-robin fashion.
func (*RoundRobinRouter) RegisterOutput ¶
func (r *RoundRobinRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name.
type RouterFunc ¶
type RouterFunc func(elem *StreamElement) []string
RouterFunc determines which output channel(s) to route an element to. Returns a slice of output names. Empty slice means drop the element.
type RouterStage ¶
type RouterStage struct {
BaseStage
// contains filtered or unexported fields
}
RouterStage routes elements to different output channels based on a routing function. This enables conditional branching and dynamic routing in the pipeline.
This is a special stage type that supports multiple outputs (1:N routing).
func NewRouterStage ¶
func NewRouterStage(name string, routerFunc RouterFunc) *RouterStage
NewRouterStage creates a new router stage with the given routing function.
func (*RouterStage) Process ¶
func (s *RouterStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Routes each element to appropriate output channel(s) based on routing function.
func (*RouterStage) RegisterOutput ¶
func (s *RouterStage) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name. This must be called before Process() to set up routing destinations.
type RoutingRule ¶
type RoutingRule struct {
// Name identifies this rule for logging/debugging.
Name string
// Predicate returns true if the element should be routed to this rule's output.
Predicate func(StreamElement) bool
// Output is the destination name for matching elements.
Output string
}
RoutingRule defines a predicate-based routing rule.
func RouteAudio ¶
func RouteAudio(output string, format AudioFormat) RoutingRule
RouteAudio creates a routing rule for audio elements with specific format.
func RouteContentType ¶
func RouteContentType(output string, ct ContentType) RoutingRule
RouteContentType creates a routing rule for elements of a specific content type.
func RouteWhen ¶
func RouteWhen(output string, predicate func(StreamElement) bool) RoutingRule
RouteWhen creates a routing rule with the given predicate.
type STTStage ¶
type STTStage struct {
BaseStage
// contains filtered or unexported fields
}
STTStage transcribes audio to text using a speech-to-text service.
This is a Transform stage: audio element → text element (1:1)
func NewSTTStage ¶
func NewSTTStage(service stt.Service, config STTStageConfig) *STTStage
NewSTTStage creates a new STT stage.
func (*STTStage) Process ¶
func (s *STTStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Transcribes audio elements to text.
type STTStageConfig ¶
type STTStageConfig struct {
// Language hint for transcription (e.g., "en")
Language string
// SkipEmpty skips transcription for empty audio
SkipEmpty bool
// MinAudioBytes is minimum audio size to transcribe
MinAudioBytes int
}
STTStageConfig configures the STTStage.
func DefaultSTTStageConfig ¶
func DefaultSTTStageConfig() STTStageConfig
DefaultSTTStageConfig returns sensible defaults.
type ScoredMessage ¶
type ScoredMessage struct {
// Index is the original position in the message slice
Index int
// Message is the actual message content
Message types.Message
// Score is the cosine similarity to the query (0.0 to 1.0)
Score float64
// IsProtected indicates if this message should always be kept
// (e.g., recent messages or system messages)
IsProtected bool
// TokenCount is the estimated token count for this message
TokenCount int
}
ScoredMessage pairs a message with its relevance score and metadata. Used during relevance-based truncation to track which messages to keep.
type ScoredMessages ¶
type ScoredMessages []ScoredMessage
ScoredMessages is a sortable slice of ScoredMessage.
func (ScoredMessages) Len ¶
func (s ScoredMessages) Len() int
func (ScoredMessages) Less ¶
func (s ScoredMessages) Less(i, j int) bool
func (ScoredMessages) Swap ¶
func (s ScoredMessages) Swap(i, j int)
type Stage ¶
type Stage interface {
// Name returns a unique identifier for this stage.
// This is used for logging, tracing, and debugging.
Name() string
// Type returns the stage's processing model.
// This helps the pipeline builder understand how the stage behaves.
Type() StageType
// Process is called once when the pipeline starts.
// The stage reads from input, processes elements, and writes to output.
// The stage MUST close output when done (or when input closes).
// Returns an error if processing fails.
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}
Stage is a processing unit in the pipeline DAG. Unlike traditional middleware, stages explicitly declare their I/O characteristics and operate on channels of StreamElements, enabling true streaming execution.
Stages read from an input channel, process elements, and write to an output channel. The stage MUST close the output channel when done (or when input closes).
Example implementation:
type ExampleStage struct {
name string
}
func (s *ExampleStage) Name() string {
return s.name
}
func (s *ExampleStage) Type() StageType {
return StageTypeTransform
}
func (s *ExampleStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
// Process element
processedElem := s.transform(elem)
// Write to output
select {
case output <- processedElem:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
type StageError ¶
StageError wraps an error with stage information.
func NewStageError ¶
func NewStageError(stageName string, stageType StageType, err error) *StageError
NewStageError creates a new StageError.
func (*StageError) Unwrap ¶
func (e *StageError) Unwrap() error
Unwrap returns the underlying error.
type StageFunc ¶
type StageFunc struct {
BaseStage
// contains filtered or unexported fields
}
StageFunc is a functional adapter that allows using a function as a Stage. This is useful for simple transformations without defining a new type.
func NewStageFunc ¶
func NewStageFunc(name string, stageType StageType, fn func(context.Context, <-chan StreamElement, chan<- StreamElement) error) *StageFunc
NewStageFunc creates a new functional stage.
func (*StageFunc) Process ¶
func (sf *StageFunc) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
Process executes the stage function.
type StageMetrics ¶
type StageMetrics struct {
StageName string
ElementsIn int64
ElementsOut int64
ElementsErrored int64
TotalLatency time.Duration
MinLatency time.Duration
MaxLatency time.Duration
AvgLatency time.Duration
LastUpdated time.Time
// contains filtered or unexported fields
}
StageMetrics contains performance metrics for a stage.
func NewStageMetrics ¶
func NewStageMetrics(stageName string) *StageMetrics
NewStageMetrics creates a new metrics collector for a stage.
func (*StageMetrics) GetMetrics ¶
func (m *StageMetrics) GetMetrics() StageMetrics
GetMetrics returns a copy of the current metrics (thread-safe).
func (*StageMetrics) RecordElement ¶
func (m *StageMetrics) RecordElement(latency time.Duration, hasError bool)
RecordElement records metrics for a processed element.
type StageType ¶
type StageType int
StageType defines the processing model of a stage.
const ( // StageTypeTransform performs 1:1 or 1:N element transformation. // Each input element produces one or more output elements. // Examples: validation, prompt assembly, text formatting. StageTypeTransform StageType = iota // StageTypeAccumulate performs N:1 accumulation. // Multiple input elements are collected and combined into one output element. // Examples: VAD buffering, message accumulation. StageTypeAccumulate // StageTypeGenerate performs 0:N generation. // Generates output elements without consuming input (or consumes once then generates many). // Examples: LLM streaming response, TTS generation. StageTypeGenerate // StageTypeSink is a terminal stage (N:0). // Consumes input elements but produces no output. // Examples: state store save, metrics collection, logging. StageTypeSink // StageTypeBidirectional supports full duplex communication. // Both reads from input and writes to output concurrently. // Examples: WebSocket session, duplex provider. StageTypeBidirectional )
type StateStoreLoadStage ¶
type StateStoreLoadStage struct {
BaseStage
// contains filtered or unexported fields
}
StateStoreLoadStage loads conversation history from state store.
func NewStateStoreLoadStage ¶
func NewStateStoreLoadStage(config *pipeline.StateStoreConfig) *StateStoreLoadStage
NewStateStoreLoadStage creates a new state store load stage.
func (*StateStoreLoadStage) Process ¶
func (s *StateStoreLoadStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process loads conversation history and emits it before current input.
type StateStoreSaveStage ¶
type StateStoreSaveStage struct {
BaseStage
// contains filtered or unexported fields
}
StateStoreSaveStage saves conversation state to state store.
func NewStateStoreSaveStage ¶
func NewStateStoreSaveStage(config *pipeline.StateStoreConfig) *StateStoreSaveStage
NewStateStoreSaveStage creates a new state store save stage.
func (*StateStoreSaveStage) Process ¶
func (s *StateStoreSaveStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process collects all messages and saves them to state store.
type StreamElement ¶
type StreamElement struct {
// Content types (at most one should be set per element)
Text *string // Text content
Audio *AudioData // Audio samples
Video *VideoData // Video frame
Image *ImageData // Image data
Message *types.Message // Complete message
ToolCall *types.MessageToolCall // Tool invocation
Part *types.ContentPart // Generic content part (text, image, audio, video)
MediaData *types.MediaContent // Media content with MIME type
// Metadata
Sequence int64 // Monotonic sequence number
Timestamp time.Time // When element was created
Source string // Stage that produced this element
Priority Priority // Scheduling priority (for QoS)
Metadata map[string]interface{} // Additional metadata for passing data between stages
// Control signals
EndOfStream bool // No more elements after this
Error error // Error propagation
}
StreamElement is the unit of data flowing through the pipeline. It can carry different types of content and supports backpressure. Each element should contain at most one content type.
Example ¶
ExampleStreamElement demonstrates creating different types of stream elements.
package main
import (
"fmt"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage"
"github.com/AltairaLabs/PromptKit/runtime/types"
)
func main() {
// Text element
textElem := stage.NewTextElement("Hello")
fmt.Printf("Text element: %v\n", *textElem.Text)
// Message element
msgElem := stage.NewMessageElement(&types.Message{
Role: "user",
Content: "Hello",
})
fmt.Printf("Message element: %s\n", msgElem.Message.Content)
// Error element
errElem := stage.NewErrorElement(fmt.Errorf("test error"))
fmt.Printf("Error element: %v\n", errElem.Error)
}
Output: Text element: Hello Message element: Hello Error element: test error
func GetAudioElement ¶
func GetAudioElement(audio *AudioData) *StreamElement
GetAudioElement retrieves a StreamElement from the pool and initializes it with audio data. This is a pooled alternative to NewAudioElement.
func GetElement ¶
func GetElement() *StreamElement
GetElement retrieves a StreamElement from the pool or creates a new one. The returned element is reset to its zero state with an initialized Metadata map. Callers should use PutElement when the element is no longer needed.
func GetEndOfStreamElement ¶
func GetEndOfStreamElement() *StreamElement
GetEndOfStreamElement retrieves a StreamElement from the pool and marks it as end-of-stream. This is a pooled alternative to NewEndOfStreamElement.
func GetErrorElement ¶
func GetErrorElement(err error) *StreamElement
GetErrorElement retrieves a StreamElement from the pool and initializes it with an error. This is a pooled alternative to NewErrorElement.
func GetImageElement ¶
func GetImageElement(image *ImageData) *StreamElement
GetImageElement retrieves a StreamElement from the pool and initializes it with image data. This is a pooled alternative to NewImageElement.
func GetMessageElement ¶
func GetMessageElement(msg *types.Message) *StreamElement
GetMessageElement retrieves a StreamElement from the pool and initializes it with a message. This is a pooled alternative to NewMessageElement.
func GetTextElement ¶
func GetTextElement(text string) *StreamElement
GetTextElement retrieves a StreamElement from the pool and initializes it with text content. This is a pooled alternative to NewTextElement.
func GetVideoElement ¶
func GetVideoElement(video *VideoData) *StreamElement
GetVideoElement retrieves a StreamElement from the pool and initializes it with video data. This is a pooled alternative to NewVideoElement.
func NewAudioElement ¶
func NewAudioElement(audio *AudioData) StreamElement
NewAudioElement creates a new StreamElement with audio data.
func NewEndOfStreamElement ¶
func NewEndOfStreamElement() StreamElement
NewEndOfStreamElement creates a new StreamElement marking end of stream.
func NewErrorElement ¶
func NewErrorElement(err error) StreamElement
NewErrorElement creates a new StreamElement with an error.
func NewImageElement ¶
func NewImageElement(image *ImageData) StreamElement
NewImageElement creates a new StreamElement with image data.
func NewMessageElement ¶
func NewMessageElement(msg *types.Message) StreamElement
NewMessageElement creates a new StreamElement with a message.
func NewTextElement ¶
func NewTextElement(text string) StreamElement
NewTextElement creates a new StreamElement with text content.
func NewVideoElement ¶
func NewVideoElement(video *VideoData) StreamElement
NewVideoElement creates a new StreamElement with video data.
func (*StreamElement) GetMetadata ¶
func (e *StreamElement) GetMetadata(key string) interface{}
GetMetadata retrieves metadata by key, returning nil if not found.
func (*StreamElement) HasContent ¶
func (e *StreamElement) HasContent() bool
HasContent returns true if the element contains any content (excluding control signals).
func (*StreamElement) IsControl ¶
func (e *StreamElement) IsControl() bool
IsControl returns true if the element is a control signal (error or end-of-stream).
func (*StreamElement) IsEmpty ¶
func (e *StreamElement) IsEmpty() bool
IsEmpty returns true if the element contains no content.
func (*StreamElement) Reset ¶
func (e *StreamElement) Reset()
Reset clears all fields of the StreamElement to their zero values. This is called automatically by PutElement before returning to the pool. The Metadata map is cleared but retained to avoid reallocation.
func (*StreamElement) WithMetadata ¶
func (e *StreamElement) WithMetadata(key string, value interface{}) *StreamElement
WithMetadata adds metadata to this element.
func (*StreamElement) WithPriority ¶
func (e *StreamElement) WithPriority(priority Priority) *StreamElement
WithPriority sets the priority for this element.
func (*StreamElement) WithSequence ¶
func (e *StreamElement) WithSequence(seq int64) *StreamElement
WithSequence sets the sequence number for this element.
func (*StreamElement) WithSource ¶
func (e *StreamElement) WithSource(source string) *StreamElement
WithSource sets the source stage name for this element.
type StreamPipeline ¶
type StreamPipeline struct {
// contains filtered or unexported fields
}
StreamPipeline represents an executable pipeline of stages. It manages the DAG of stages, creates channels between them, and orchestrates execution.
func (*StreamPipeline) Execute ¶
func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)
Execute starts the pipeline execution with the given input channel. Returns an output channel that will receive all elements from terminal stages. The pipeline executes in background goroutines and closes the output channel when complete.
func (*StreamPipeline) ExecuteSync ¶
func (p *StreamPipeline) ExecuteSync(ctx context.Context, input ...StreamElement) (*ExecutionResult, error)
ExecuteSync runs the pipeline synchronously and returns the accumulated result. This is a convenience method for request/response mode where you want a single result. It converts the streaming execution into a blocking call.
type TTSConfig ¶
type TTSConfig struct {
// SkipEmpty skips synthesis for empty or whitespace-only text
SkipEmpty bool
// MinTextLength is the minimum text length to synthesize (0 = no minimum)
MinTextLength int
}
TTSConfig contains configuration for TTS stage.
func DefaultTTSConfig ¶
func DefaultTTSConfig() TTSConfig
DefaultTTSConfig returns sensible defaults for TTS configuration.
type TTSService ¶
type TTSService interface {
// Synthesize converts text to audio bytes.
Synthesize(ctx context.Context, text string) ([]byte, error)
// MIMEType returns the MIME type of the synthesized audio.
MIMEType() string
}
TTSService converts text to audio.
type TTSStage ¶
type TTSStage struct {
BaseStage
// contains filtered or unexported fields
}
TTSStage synthesizes audio for streaming text elements. It reads text elements from input and adds audio data to them.
This is a Transform stage: text element → text+audio element (1:1)
func NewTTSStage ¶
func NewTTSStage(tts TTSService, config TTSConfig) *TTSStage
NewTTSStage creates a new TTS stage.
func (*TTSStage) Process ¶
func (s *TTSStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Synthesizes audio for each text element and adds it to the element.
type TTSStageWithInterruption ¶
type TTSStageWithInterruption struct {
BaseStage
// contains filtered or unexported fields
}
TTSStageWithInterruption synthesizes text to audio with interruption support. When the user starts speaking (detected via shared InterruptionHandler), synthesis is stopped and pending output is discarded.
This is a Transform stage: text element → audio element (1:1)
func NewTTSStageWithInterruption ¶
func NewTTSStageWithInterruption( service tts.Service, config TTSStageWithInterruptionConfig, ) *TTSStageWithInterruption
NewTTSStageWithInterruption creates a new TTS stage with interruption support.
func (*TTSStageWithInterruption) Process ¶
func (s *TTSStageWithInterruption) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Synthesizes audio for text elements with interruption support.
type TTSStageWithInterruptionConfig ¶
type TTSStageWithInterruptionConfig struct {
// Voice is the voice ID to use
Voice string
// Speed is the speech rate (0.5-2.0)
Speed float64
// InterruptionHandler for detecting user interrupts during TTS output.
// Should be shared with AudioTurnStage.
InterruptionHandler *audio.InterruptionHandler
// SkipEmpty skips synthesis for empty text
SkipEmpty bool
// MinTextLength is minimum text length to synthesize
MinTextLength int
}
TTSStageWithInterruptionConfig configures TTSStageWithInterruption.
func DefaultTTSStageWithInterruptionConfig ¶
func DefaultTTSStageWithInterruptionConfig() TTSStageWithInterruptionConfig
DefaultTTSStageWithInterruptionConfig returns sensible defaults.
type TemplateStage ¶
type TemplateStage struct {
BaseStage
}
TemplateStage substitutes {{variable}} placeholders in messages and metadata.
This stage reads variables from the element's metadata["variables"] map and replaces all occurrences of {{variable_name}} in:
- metadata["system_prompt"] - the system prompt for the LLM
- message.Content - the message text content
- message.Parts[].Text - individual content parts
Variables are typically set by:
- PromptAssemblyStage (from base_variables in config)
- VariableProviderStage (from dynamic variable providers)
Example:
Input: "Hello {{name}}, the topic is {{topic}}"
Variables: {"name": "Alice", "topic": "AI"}
Output: "Hello Alice, the topic is AI"
This is a Transform stage: 1 input element → 1 output element
func NewTemplateStage ¶
func NewTemplateStage() *TemplateStage
NewTemplateStage creates a template substitution stage.
func (*TemplateStage) Process ¶
func (s *TemplateStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process substitutes variables in messages and system prompt metadata.
type TracingStage ¶
type TracingStage struct {
BaseStage
// contains filtered or unexported fields
}
TracingStage wraps another stage and adds element-level tracing. Each element gets a trace ID and timing information.
func NewTracingStage ¶
func NewTracingStage(wrappedStage Stage, traceIDGen func() string) *TracingStage
NewTracingStage wraps a stage with tracing support.
func (*TracingStage) Process ¶
func (s *TracingStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface with tracing.
type Transcriber ¶
Transcriber converts audio bytes to text. Follows Go naming convention for single-method interfaces.
type TruncationStrategy ¶
type TruncationStrategy string
TruncationStrategy defines how to handle messages when over token budget.
const ( // TruncateOldest drops oldest messages first TruncateOldest TruncationStrategy = "oldest" // TruncateLeastRelevant drops least relevant messages (requires embeddings) TruncateLeastRelevant TruncationStrategy = "relevance" // TruncateSummarize compresses old messages into summaries TruncateSummarize TruncationStrategy = "summarize" // TruncateFail returns error if over budget TruncateFail TruncationStrategy = "fail" )
type VADAccumulatorStage ¶
type VADAccumulatorStage struct {
BaseStage
// contains filtered or unexported fields
}
VADAccumulatorStage reads streaming audio chunks, detects turn boundaries via VAD, and emits a single Message element with the transcribed text.
This is an Accumulate stage: N audio chunks → 1 message element
func NewVADAccumulatorStage ¶
func NewVADAccumulatorStage( analyzer audio.VADAnalyzer, transcriber Transcriber, config VADConfig, ) *VADAccumulatorStage
NewVADAccumulatorStage creates a new VAD accumulator stage.
func (*VADAccumulatorStage) Process ¶
func (s *VADAccumulatorStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process implements the Stage interface. Accumulates audio chunks until turn complete, then transcribes and emits a message.
type VADConfig ¶
type VADConfig struct {
// Threshold for silence detection (0.0 = silence, 1.0 = speech)
Threshold float64
// MinSpeechDuration is the minimum duration of speech before turn can complete
MinSpeechDuration time.Duration
// MaxTurnDuration is the maximum duration before forcing turn completion
MaxTurnDuration time.Duration
// SilenceDuration is how long silence must persist to trigger turn complete
SilenceDuration time.Duration
}
VADConfig contains configuration for VAD accumulator stage.
func DefaultVADConfig ¶
func DefaultVADConfig() VADConfig
DefaultVADConfig returns sensible defaults for VAD configuration.
type ValidationStage ¶
type ValidationStage struct {
BaseStage
// contains filtered or unexported fields
}
ValidationStage validates responses using configured validators.
func NewValidationStage ¶
func NewValidationStage(registry *validators.Registry, suppressExceptions bool) *ValidationStage
NewValidationStage creates a new validation stage.
func (*ValidationStage) Process ¶
func (s *ValidationStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process validates response elements and attaches results to metadata.
type VariableProviderStage ¶
type VariableProviderStage struct {
BaseStage
// contains filtered or unexported fields
}
VariableProviderStage resolves variables from dynamic providers and adds them to metadata.
This stage calls each registered variable provider to fetch dynamic variables (e.g., from environment, external services, databases) and merges them into the element's metadata["variables"] map for use by TemplateStage.
Provider resolution order:
- Variables from earlier stages (e.g., PromptAssemblyStage base_variables)
- Each provider is called in sequence; later providers can override earlier values
Error handling:
- If any provider fails, the stage returns an error and aborts the pipeline
- This ensures variable resolution failures are surfaced early
Example providers:
- Environment variable provider: reads from OS environment
- Config provider: reads from configuration files
- External API provider: fetches user context from external services
This is a Transform stage: 1 input element → 1 output element (with enriched metadata)
func NewVariableProviderStage ¶
func NewVariableProviderStage(providers ...variables.Provider) *VariableProviderStage
NewVariableProviderStage creates a variable provider stage.
func (*VariableProviderStage) Process ¶
func (s *VariableProviderStage) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process resolves variables from all providers and merges them into element metadata.
type VideoData ¶
type VideoData struct {
Data []byte // Raw video frame data or encoded video segment
MIMEType string // MIME type (e.g., "video/mp4", "video/webm")
Width int // Frame width in pixels
Height int // Frame height in pixels
FrameRate float64 // Frames per second
Duration time.Duration // Duration of the video segment
Timestamp time.Time // Timestamp of this frame
Format string // Format identifier (e.g., "h264", "vp8")
IsKeyFrame bool // True if this is a key frame
}
VideoData carries video frame data with metadata.
type WeightedRouter ¶
type WeightedRouter struct {
BaseStage
// contains filtered or unexported fields
}
WeightedRouter distributes elements across outputs based on configured weights.
func NewWeightedRouter ¶
func NewWeightedRouter(name string, weights map[string]float64) *WeightedRouter
NewWeightedRouter creates a router that distributes elements based on weights. Weights are normalized to sum to 1.0. Example: {"primary": 0.7, "secondary": 0.3} routes 70% to primary, 30% to secondary.
func (*WeightedRouter) Process ¶
func (r *WeightedRouter) Process( ctx context.Context, input <-chan StreamElement, output chan<- StreamElement, ) error
Process distributes elements based on weights.
func (*WeightedRouter) RegisterOutput ¶
func (r *WeightedRouter) RegisterOutput(name string, output chan<- StreamElement)
RegisterOutput registers an output channel with a name.
Source Files
¶
- builder.go
- capabilities.go
- capabilities_validation.go
- config.go
- element.go
- element_pool.go
- errors.go
- pipeline.go
- router_strategies.go
- similarity.go
- stage.go
- stages_advanced.go
- stages_core.go
- stages_duplex_provider_integration.go
- stages_provider.go
- stages_recording.go
- stages_resample.go
- stages_speech_integration.go
- stages_tts_integration.go
- stages_utilities.go
- stages_vad_integration.go