Documentation
¶
Overview ¶
Package streaming provides generic utilities for bidirectional streaming communication with LLM providers.
This package extracts common patterns used in duplex (bidirectional) streaming conversations, including:
- Response processing state machine for handling provider responses
- Tool execution interface for streaming tool calls
- Audio streaming utilities for sending audio chunks to providers
- Response collection patterns for managing streaming responses
The package is designed to be provider-agnostic, working with any provider that implements the runtime/providers streaming interfaces.
Response Processing ¶
The response state machine (ProcessResponseElement) analyzes stream elements and determines appropriate actions:
- Continue: informational element, keep waiting
- Complete: turn finished with valid response
- Error: error or unexpected empty response
- ToolCalls: tool calls need execution
Tool Execution ¶
The ToolExecutor interface allows custom tool registry implementations to be plugged in. The package provides helpers for sending tool results back through the streaming pipeline.
Audio Streaming ¶
AudioStreamer provides utilities for streaming audio data in either burst mode (all at once) or real-time mode (paced to match playback speed).
Response Collection ¶
ResponseCollector manages the goroutine pattern for collecting streaming responses from a provider session, with optional tool call handling.
Index ¶
- Constants
- Variables
- func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement
- func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)
- func ExecuteAndSend(ctx context.Context, executor ToolExecutor, toolCalls []types.MessageToolCall, ...) error
- func SendEndOfStream(ctx context.Context, inputChan chan<- stage.StreamElement) error
- func SendToolResults(ctx context.Context, result *ToolExecutionResult, ...) error
- func WaitForResponse(ctx context.Context, responseDone <-chan error) error
- type AudioStreamer
- func (a *AudioStreamer) SendChunk(ctx context.Context, chunk []byte, sampleRate int, ...) error
- func (a *AudioStreamer) StreamBurst(ctx context.Context, audioData []byte, sampleRate int, ...) error
- func (a *AudioStreamer) StreamRealtime(ctx context.Context, audioData []byte, sampleRate int, ...) error
- type ResponseAction
- type ResponseCollector
- type ResponseCollectorConfig
- type ToolExecutionResult
- type ToolExecutor
Constants ¶
const ( // DefaultChunkSize is the default audio chunk size in bytes. // 640 bytes = 20ms at 16kHz 16-bit mono (16000 * 2 * 0.02) DefaultChunkSize = 640 // DefaultSampleRate is the default audio sample rate in Hz. // 16kHz is required by Gemini Live API. DefaultSampleRate = 16000 // DefaultChunkIntervalMs is the default interval between chunks in milliseconds // when streaming in real-time mode. DefaultChunkIntervalMs = 20 )
Default audio configuration constants
Variables ¶
var ErrEmptyResponse = errors.New("empty response, likely interrupted")
ErrEmptyResponse is returned when a response element has no content. This typically indicates an interrupted response that wasn't properly handled.
var ErrSessionEnded = errors.New("session ended")
ErrSessionEnded is returned when the streaming session has ended. This is not necessarily an error, just indicates the session is complete.
Functions ¶
func BuildToolResponseElement ¶
func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement
BuildToolResponseElement creates a stream element containing tool results. This element can be sent through the pipeline to: 1. Forward tool responses to the provider (via metadata["tool_responses"]) 2. Capture tool results in the state store (via metadata["tool_result_messages"])
func DrainStaleMessages ¶
func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)
DrainStaleMessages removes any buffered messages from the output channel. This is useful for clearing state between turns.
Returns the number of messages drained, or an error if the session ended.
func ExecuteAndSend ¶
func ExecuteAndSend( ctx context.Context, executor ToolExecutor, toolCalls []types.MessageToolCall, inputChan chan<- stage.StreamElement, ) error
ExecuteAndSend is a convenience function that executes tool calls and sends the results through the pipeline in one operation.
If the executor is nil, this function returns nil (no-op).
func SendEndOfStream ¶
func SendEndOfStream( ctx context.Context, inputChan chan<- stage.StreamElement, ) error
SendEndOfStream signals that audio input is complete for the current turn. This triggers the provider to generate a response.
func SendToolResults ¶
func SendToolResults( ctx context.Context, result *ToolExecutionResult, inputChan chan<- stage.StreamElement, ) error
SendToolResults sends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.
This matches the behavior of non-streaming mode where tool results are stored as messages. The tool result messages are sent via inputChan with metadata, and DuplexProviderStage forwards them to output for state store capture.
Types ¶
type AudioStreamer ¶
type AudioStreamer struct {
// ChunkSize is the number of bytes per chunk.
ChunkSize int
// ChunkIntervalMs is the interval between chunks in milliseconds
// when streaming in real-time mode.
ChunkIntervalMs int
}
AudioStreamer provides utilities for streaming audio data through a pipeline.
func NewAudioStreamer ¶
func NewAudioStreamer() *AudioStreamer
NewAudioStreamer creates a new audio streamer with default settings.
func (*AudioStreamer) SendChunk ¶
func (a *AudioStreamer) SendChunk( ctx context.Context, chunk []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
SendChunk sends a single audio chunk through the pipeline.
func (*AudioStreamer) StreamBurst ¶
func (a *AudioStreamer) StreamBurst( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
StreamBurst sends all audio data as fast as possible without pacing. This is preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.
The provider receives all audio before detecting any turn boundaries, which prevents "user interrupted" signals from arriving mid-utterance.
func (*AudioStreamer) StreamRealtime ¶
func (a *AudioStreamer) StreamRealtime( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
StreamRealtime sends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.
Note: This mode can cause issues with some providers (like Gemini) that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.
type ResponseAction ¶
type ResponseAction int
ResponseAction indicates what action to take after processing a response element.
const ( // ResponseActionContinue means the element was informational (e.g., interruption signal), // and we should continue waiting for the final response. ResponseActionContinue ResponseAction = iota // ResponseActionComplete means we received a complete response. ResponseActionComplete // ResponseActionError means an error occurred or the response was empty. ResponseActionError // ResponseActionToolCalls means the response contains tool calls that need to be executed. ResponseActionToolCalls )
func ProcessResponseElement ¶
func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)
ProcessResponseElement handles a response element from the pipeline, determining the appropriate action based on interruption signals, turn completion, and errors.
This is the core state machine for duplex streaming response handling. It consolidates the response handling logic needed for bidirectional streaming.
Returns:
- ResponseAction: what action to take
- error: any error to return (only set when action is ResponseActionError)
func (ResponseAction) String ¶
func (a ResponseAction) String() string
String returns a human-readable representation of the action.
type ResponseCollector ¶
type ResponseCollector struct {
// contains filtered or unexported fields
}
ResponseCollector manages response collection from a streaming session. It processes streaming elements, handles tool calls, and signals completion.
func NewResponseCollector ¶
func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollector
NewResponseCollector creates a new response collector with the given configuration.
func (*ResponseCollector) Start ¶
func (c *ResponseCollector) Start( ctx context.Context, outputChan <-chan stage.StreamElement, inputChan chan<- stage.StreamElement, ) <-chan error
Start begins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.
The collector will: 1. Process incoming stream elements 2. Execute tool calls via the ToolExecutor (if configured) 3. Send tool results back through inputChan 4. Signal completion or error through the returned channel
type ResponseCollectorConfig ¶
type ResponseCollectorConfig struct {
// ToolExecutor is called when tool calls are received.
// If nil, tool calls will result in an error.
ToolExecutor ToolExecutor
// LogPrefix is prepended to log messages for identification.
LogPrefix string
}
ResponseCollectorConfig configures response collection behavior.
type ToolExecutionResult ¶
type ToolExecutionResult struct {
// ProviderResponses are formatted for sending back to the streaming provider.
ProviderResponses []providers.ToolResponse
// ResultMessages are formatted for state store capture,
// matching the behavior of non-streaming tool execution.
ResultMessages []types.Message
}
ToolExecutionResult contains the results of executing tool calls.
type ToolExecutor ¶
type ToolExecutor interface {
// Execute runs the given tool calls and returns their results.
// The implementation is responsible for handling execution errors
// and formatting them appropriately in the result.
Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)
}
ToolExecutor executes tool calls and returns results. Implementations provide the actual tool registry integration.