Documentation
¶
Overview ¶
Package streaming provides generic utilities for bidirectional streaming communication with LLM providers.
This package extracts common patterns used in duplex (bidirectional) streaming conversations, including:
- Response processing state machine for handling provider responses
- Tool execution interface for streaming tool calls
- Audio streaming utilities for sending audio chunks to providers
- Response collection patterns for managing streaming responses
The package is designed to be provider-agnostic, working with any provider that implements the runtime/providers streaming interfaces.
Response Processing ¶
The response state machine (ProcessResponseElement) analyzes stream elements and determines appropriate actions:
- Continue: informational element, keep waiting
- Complete: turn finished with valid response
- Error: error or unexpected empty response
- ToolCalls: tool calls need execution
Tool Execution ¶
The ToolExecutor interface allows custom tool registry implementations to be plugged in. The package provides helpers for sending tool results back through the streaming pipeline.
Audio Streaming ¶
AudioStreamer provides utilities for streaming audio data in either burst mode (all at once) or real-time mode (paced to match playback speed).
Response Collection ¶
ResponseCollector manages the goroutine pattern for collecting streaming responses from a provider session, with optional tool call handling.
Index ¶
- Constants
- Variables
- func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement
- func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)
- func ExecuteAndSend(ctx context.Context, executor ToolExecutor, toolCalls []types.MessageToolCall, ...) error
- func SendEndOfStream(ctx context.Context, inputChan chan<- stage.StreamElement) error
- func SendImageEndOfStream(ctx context.Context, output chan<- stage.StreamElement) error
- func SendToolResults(ctx context.Context, result *ToolExecutionResult, ...) error
- func SendVideoEndOfStream(ctx context.Context, output chan<- stage.StreamElement) error
- func WaitForResponse(ctx context.Context, responseDone <-chan error) error
- type AudioStreamer
- func (a *AudioStreamer) SendChunk(ctx context.Context, chunk []byte, sampleRate int, ...) error
- func (a *AudioStreamer) StreamBurst(ctx context.Context, audioData []byte, sampleRate int, ...) error
- func (a *AudioStreamer) StreamRealtime(ctx context.Context, audioData []byte, sampleRate int, ...) error
- type ImageStreamer
- func (s *ImageStreamer) SendFrame(ctx context.Context, data []byte, mimeType string, frameNum int64, ...) error
- func (s *ImageStreamer) SendFrameWithDimensions(ctx context.Context, data []byte, mimeType string, width, height int, ...) error
- func (s *ImageStreamer) StreamFramesBurst(ctx context.Context, frames [][]byte, mimeType string, ...) error
- func (s *ImageStreamer) StreamFramesRealtime(ctx context.Context, frames [][]byte, mimeType string, ...) error
- type ResponseAction
- type ResponseCollector
- type ResponseCollectorConfig
- type ToolExecutionResult
- type ToolExecutor
- type VideoChunk
- type VideoStreamer
- func (s *VideoStreamer) SendChunk(ctx context.Context, data []byte, mimeType string, chunkIndex int, ...) error
- func (s *VideoStreamer) SendChunkWithDimensions(ctx context.Context, data []byte, mimeType string, width, height int, ...) error
- func (s *VideoStreamer) StreamChunksBurst(ctx context.Context, chunks []VideoChunk, mimeType string, ...) error
- func (s *VideoStreamer) StreamChunksRealtime(ctx context.Context, chunks []VideoChunk, mimeType string, ...) error
Constants ¶
const ( // DefaultChunkSize is the default audio chunk size in bytes. // 640 bytes = 20ms at 16kHz 16-bit mono (16000 * 2 * 0.02) DefaultChunkSize = 640 // DefaultSampleRate is the default audio sample rate in Hz. // 16kHz is required by Gemini Live API. DefaultSampleRate = 16000 // DefaultChunkIntervalMs is the default interval between chunks in milliseconds // when streaming in real-time mode. DefaultChunkIntervalMs = 20 )
Default audio configuration constants
const ( // DefaultTargetFPS is the default target frame rate for image streaming. // 1 FPS is suitable for most LLM vision scenarios. DefaultTargetFPS = 1.0 // DefaultImageQuality is the default JPEG quality (1-100). DefaultImageQuality = 85 )
Default image streaming configuration constants.
const ( // DefaultChunkDurationMs is the default video chunk duration in milliseconds. // 1000ms (1 second) chunks provide good balance between latency and efficiency. DefaultChunkDurationMs = 1000 )
Default video streaming configuration constants.
Variables ¶
var ErrEmptyResponse = errors.New("empty response, likely interrupted")
ErrEmptyResponse is returned when a response element has no content. This typically indicates an interrupted response that wasn't properly handled.
var ErrSessionEnded = errors.New("session ended")
ErrSessionEnded is returned when the streaming session has ended. This is not necessarily an error, just indicates the session is complete.
Functions ¶
func BuildToolResponseElement ¶
func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement
BuildToolResponseElement creates a stream element containing tool results. This element can be sent through the pipeline to: 1. Forward tool responses to the provider (via metadata["tool_responses"]) 2. Capture tool results in the state store (via metadata["tool_result_messages"])
func DrainStaleMessages ¶
func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)
DrainStaleMessages removes any buffered messages from the output channel. This is useful for clearing state between turns.
Returns the number of messages drained, or an error if the session ended.
func ExecuteAndSend ¶
func ExecuteAndSend( ctx context.Context, executor ToolExecutor, toolCalls []types.MessageToolCall, inputChan chan<- stage.StreamElement, ) error
ExecuteAndSend is a convenience function that executes tool calls and sends the results through the pipeline in one operation.
If the executor is nil, this function returns nil (no-op).
func SendEndOfStream ¶
func SendEndOfStream( ctx context.Context, inputChan chan<- stage.StreamElement, ) error
SendEndOfStream signals that audio input is complete for the current turn. This triggers the provider to generate a response.
func SendImageEndOfStream ¶ added in v1.1.8
func SendImageEndOfStream( ctx context.Context, output chan<- stage.StreamElement, ) error
SendImageEndOfStream signals that image/frame input is complete for the current turn. This triggers the provider to generate a response.
func SendToolResults ¶
func SendToolResults( ctx context.Context, result *ToolExecutionResult, inputChan chan<- stage.StreamElement, ) error
SendToolResults sends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.
This matches the behavior of non-streaming mode where tool results are stored as messages. The tool result messages are sent via inputChan with metadata, and DuplexProviderStage forwards them to output for state store capture.
func SendVideoEndOfStream ¶ added in v1.1.8
func SendVideoEndOfStream( ctx context.Context, output chan<- stage.StreamElement, ) error
SendVideoEndOfStream signals that video input is complete for the current turn. This triggers the provider to generate a response.
Types ¶
type AudioStreamer ¶
type AudioStreamer struct {
// ChunkSize is the number of bytes per chunk.
ChunkSize int
// ChunkIntervalMs is the interval between chunks in milliseconds
// when streaming in real-time mode.
ChunkIntervalMs int
}
AudioStreamer provides utilities for streaming audio data through a pipeline.
func NewAudioStreamer ¶
func NewAudioStreamer() *AudioStreamer
NewAudioStreamer creates a new audio streamer with default settings.
func (*AudioStreamer) SendChunk ¶
func (a *AudioStreamer) SendChunk( ctx context.Context, chunk []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
SendChunk sends a single audio chunk through the pipeline.
func (*AudioStreamer) StreamBurst ¶
func (a *AudioStreamer) StreamBurst( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
StreamBurst sends all audio data as fast as possible without pacing. This is preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.
The provider receives all audio before detecting any turn boundaries, which prevents "user interrupted" signals from arriving mid-utterance.
func (*AudioStreamer) StreamRealtime ¶
func (a *AudioStreamer) StreamRealtime( ctx context.Context, audioData []byte, sampleRate int, inputChan chan<- stage.StreamElement, ) error
StreamRealtime sends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.
Note: This mode can cause issues with some providers (like Gemini) that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.
type ImageStreamer ¶ added in v1.1.8
type ImageStreamer struct {
// TargetFPS is the target frame rate for realtime streaming.
// Default: 1.0 (1 frame per second).
TargetFPS float64
}
ImageStreamer provides utilities for streaming image frames through a pipeline. Use this for realtime video scenarios like webcam feeds or screen sharing.
func NewImageStreamer ¶ added in v1.1.8
func NewImageStreamer(targetFPS float64) *ImageStreamer
NewImageStreamer creates a new image streamer with the specified target FPS. Use targetFPS of 0 or less for default (1.0 FPS).
func (*ImageStreamer) SendFrame ¶ added in v1.1.8
func (s *ImageStreamer) SendFrame( ctx context.Context, data []byte, mimeType string, frameNum int64, timestamp time.Time, output chan<- stage.StreamElement, ) error
SendFrame sends a single image frame through the pipeline without pacing. This is the burst mode equivalent - sends immediately without delay.
Parameters:
- data: Raw image data (JPEG, PNG, etc.)
- mimeType: MIME type of the image (e.g., "image/jpeg")
- frameNum: Sequence number for ordering
- timestamp: When the frame was captured
- output: Pipeline input channel
func (*ImageStreamer) SendFrameWithDimensions ¶ added in v1.1.8
func (s *ImageStreamer) SendFrameWithDimensions( ctx context.Context, data []byte, mimeType string, width, height int, frameNum int64, timestamp time.Time, output chan<- stage.StreamElement, ) error
SendFrameWithDimensions sends a frame with explicit width and height. Use this when dimensions are known to avoid decoding overhead downstream.
func (*ImageStreamer) StreamFramesBurst ¶ added in v1.1.8
func (s *ImageStreamer) StreamFramesBurst( ctx context.Context, frames [][]byte, mimeType string, output chan<- stage.StreamElement, ) error
StreamFramesBurst sends all frames as fast as possible without pacing. Use this for pre-recorded frame sequences where real-time pacing isn't needed.
func (*ImageStreamer) StreamFramesRealtime ¶ added in v1.1.8
func (s *ImageStreamer) StreamFramesRealtime( ctx context.Context, frames [][]byte, mimeType string, output chan<- stage.StreamElement, ) error
StreamFramesRealtime sends frames paced to match the target FPS. Use this for simulating real-time playback of pre-recorded frames.
type ResponseAction ¶
type ResponseAction int
ResponseAction indicates what action to take after processing a response element.
const ( // ResponseActionContinue means the element was informational (e.g., interruption signal), // and we should continue waiting for the final response. ResponseActionContinue ResponseAction = iota // ResponseActionComplete means we received a complete response. ResponseActionComplete // ResponseActionError means an error occurred or the response was empty. ResponseActionError // ResponseActionToolCalls means the response contains tool calls that need to be executed. ResponseActionToolCalls )
func ProcessResponseElement ¶
func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)
ProcessResponseElement handles a response element from the pipeline, determining the appropriate action based on interruption signals, turn completion, and errors.
This is the core state machine for duplex streaming response handling. It consolidates the response handling logic needed for bidirectional streaming.
Returns:
- ResponseAction: what action to take
- error: any error to return (only set when action is ResponseActionError)
func (ResponseAction) String ¶
func (a ResponseAction) String() string
String returns a human-readable representation of the action.
type ResponseCollector ¶
type ResponseCollector struct {
// contains filtered or unexported fields
}
ResponseCollector manages response collection from a streaming session. It processes streaming elements, handles tool calls, and signals completion.
func NewResponseCollector ¶
func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollector
NewResponseCollector creates a new response collector with the given configuration.
func (*ResponseCollector) Start ¶
func (c *ResponseCollector) Start( ctx context.Context, outputChan <-chan stage.StreamElement, inputChan chan<- stage.StreamElement, ) <-chan error
Start begins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.
The collector will: 1. Process incoming stream elements 2. Execute tool calls via the ToolExecutor (if configured) 3. Send tool results back through inputChan 4. Signal completion or error through the returned channel
type ResponseCollectorConfig ¶
type ResponseCollectorConfig struct {
// ToolExecutor is called when tool calls are received.
// If nil, tool calls will result in an error.
ToolExecutor ToolExecutor
// LogPrefix is prepended to log messages for identification.
LogPrefix string
}
ResponseCollectorConfig configures response collection behavior.
type ToolExecutionResult ¶
type ToolExecutionResult struct {
// ProviderResponses are formatted for sending back to the streaming provider.
ProviderResponses []providers.ToolResponse
// ResultMessages are formatted for state store capture,
// matching the behavior of non-streaming tool execution.
ResultMessages []types.Message
}
ToolExecutionResult contains the results of executing tool calls.
type ToolExecutor ¶
type ToolExecutor interface {
// Execute runs the given tool calls and returns their results.
// The implementation is responsible for handling execution errors
// and formatting them appropriately in the result.
Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)
}
ToolExecutor executes tool calls and returns results. Implementations provide the actual tool registry integration.
type VideoChunk ¶ added in v1.1.8
VideoChunk represents a video chunk with metadata for batch streaming.
type VideoStreamer ¶ added in v1.1.8
type VideoStreamer struct {
// ChunkDurationMs is the target duration of each video chunk in milliseconds.
// Default: 1000 (1 second).
ChunkDurationMs int
}
VideoStreamer provides utilities for streaming video chunks through a pipeline. Use this for encoded video segments (H.264, VP8, etc.) rather than individual frames. For individual image frames, use ImageStreamer instead.
func NewVideoStreamer ¶ added in v1.1.8
func NewVideoStreamer(chunkDurationMs int) *VideoStreamer
NewVideoStreamer creates a new video streamer with the specified chunk duration. Use chunkDurationMs of 0 or less for default (1000ms).
func (*VideoStreamer) SendChunk ¶ added in v1.1.8
func (s *VideoStreamer) SendChunk( ctx context.Context, data []byte, mimeType string, chunkIndex int, isKeyFrame bool, timestamp time.Time, output chan<- stage.StreamElement, ) error
SendChunk sends a single video chunk through the pipeline.
Parameters:
- data: Encoded video data (H.264, VP8, etc.)
- mimeType: MIME type of the video (e.g., "video/h264", "video/webm")
- chunkIndex: Sequence number for ordering
- isKeyFrame: True if this chunk contains a keyframe (important for decoding)
- timestamp: When the chunk was captured/created
- output: Pipeline input channel
func (*VideoStreamer) SendChunkWithDimensions ¶ added in v1.1.8
func (s *VideoStreamer) SendChunkWithDimensions( ctx context.Context, data []byte, mimeType string, width, height int, frameRate float64, chunkIndex int, isKeyFrame bool, timestamp time.Time, duration time.Duration, output chan<- stage.StreamElement, ) error
SendChunkWithDimensions sends a video chunk with explicit dimensions and frame rate. Use this when video metadata is known to avoid parsing overhead downstream.
func (*VideoStreamer) StreamChunksBurst ¶ added in v1.1.8
func (s *VideoStreamer) StreamChunksBurst( ctx context.Context, chunks []VideoChunk, mimeType string, output chan<- stage.StreamElement, ) error
StreamChunksBurst sends all video chunks as fast as possible without pacing. Use this for pre-recorded video where real-time pacing isn't needed.
func (*VideoStreamer) StreamChunksRealtime ¶ added in v1.1.8
func (s *VideoStreamer) StreamChunksRealtime( ctx context.Context, chunks []VideoChunk, mimeType string, output chan<- stage.StreamElement, ) error
StreamChunksRealtime sends video chunks paced according to their duration. Use this for simulating real-time playback of pre-recorded video.