streaming

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package streaming provides generic utilities for bidirectional streaming communication with LLM providers.

This package extracts common patterns used in duplex (bidirectional) streaming conversations, including:

  • Response processing state machine for handling provider responses
  • Tool execution interface for streaming tool calls
  • Audio streaming utilities for sending audio chunks to providers
  • Response collection patterns for managing streaming responses

The package is designed to be provider-agnostic, working with any provider that implements the runtime/providers streaming interfaces.

Response Processing

The response state machine (ProcessResponseElement) analyzes stream elements and determines appropriate actions:

  • Continue: informational element, keep waiting
  • Complete: turn finished with valid response
  • Error: error or unexpected empty response
  • ToolCalls: tool calls need execution

Tool Execution

The ToolExecutor interface allows custom tool registry implementations to be plugged in. The package provides helpers for sending tool results back through the streaming pipeline.

Audio Streaming

AudioStreamer provides utilities for streaming audio data in either burst mode (all at once) or real-time mode (paced to match playback speed).

Response Collection

ResponseCollector manages the goroutine pattern for collecting streaming responses from a provider session, with optional tool call handling.

Index

Constants

View Source
const (
	// DefaultChunkSize is the default audio chunk size in bytes.
	// 640 bytes = 20ms at 16kHz 16-bit mono (16000 * 2 * 0.02)
	DefaultChunkSize = 640

	// DefaultSampleRate is the default audio sample rate in Hz.
	// 16kHz is required by Gemini Live API.
	DefaultSampleRate = 16000

	// DefaultChunkIntervalMs is the default interval between chunks in milliseconds
	// when streaming in real-time mode.
	DefaultChunkIntervalMs = 20
)

Default audio configuration constants

View Source
const (
	// DefaultTargetFPS is the default target frame rate for image streaming.
	// 1 FPS is suitable for most LLM vision scenarios.
	DefaultTargetFPS = 1.0

	// DefaultImageQuality is the default JPEG quality (1-100).
	DefaultImageQuality = 85
)

Default image streaming configuration constants.

View Source
const (
	// DefaultChunkDurationMs is the default video chunk duration in milliseconds.
	// 1000ms (1 second) chunks provide good balance between latency and efficiency.
	DefaultChunkDurationMs = 1000
)

Default video streaming configuration constants.

Variables

View Source
var ErrEmptyResponse = errors.New("empty response, likely interrupted")

ErrEmptyResponse is returned when a response element has no content. This typically indicates an interrupted response that wasn't properly handled.

View Source
var ErrSessionEnded = errors.New("session ended")

ErrSessionEnded is returned when the streaming session has ended. This is not necessarily an error, just indicates the session is complete.

Functions

func BuildToolResponseElement

func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement

BuildToolResponseElement creates a stream element containing tool results. This element can be sent through the pipeline to: 1. Forward tool responses to the provider (via metadata["tool_responses"]) 2. Capture tool results in the state store (via metadata["tool_result_messages"])

func DrainStaleMessages

func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)

DrainStaleMessages removes any buffered messages from the output channel. This is useful for clearing state between turns.

Returns the number of messages drained, or an error if the session ended.

func ExecuteAndSend

func ExecuteAndSend(
	ctx context.Context,
	executor ToolExecutor,
	toolCalls []types.MessageToolCall,
	inputChan chan<- stage.StreamElement,
) error

ExecuteAndSend is a convenience function that executes tool calls and sends the results through the pipeline in one operation.

If the executor is nil, this function returns nil (no-op).

func SendEndOfStream

func SendEndOfStream(
	ctx context.Context,
	inputChan chan<- stage.StreamElement,
) error

SendEndOfStream signals that audio input is complete for the current turn. This triggers the provider to generate a response.

func SendImageEndOfStream added in v1.1.8

func SendImageEndOfStream(
	ctx context.Context,
	output chan<- stage.StreamElement,
) error

SendImageEndOfStream signals that image/frame input is complete for the current turn. This triggers the provider to generate a response.

func SendToolResults

func SendToolResults(
	ctx context.Context,
	result *ToolExecutionResult,
	inputChan chan<- stage.StreamElement,
) error

SendToolResults sends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.

This matches the behavior of non-streaming mode where tool results are stored as messages. The tool result messages are sent via inputChan with metadata, and DuplexProviderStage forwards them to output for state store capture.

func SendVideoEndOfStream added in v1.1.8

func SendVideoEndOfStream(
	ctx context.Context,
	output chan<- stage.StreamElement,
) error

SendVideoEndOfStream signals that video input is complete for the current turn. This triggers the provider to generate a response.

func WaitForResponse

func WaitForResponse(ctx context.Context, responseDone <-chan error) error

WaitForResponse waits for the response collection to complete. This is a convenience function for blocking until a response is received.

Types

type AudioStreamer

type AudioStreamer struct {
	// ChunkSize is the number of bytes per chunk.
	ChunkSize int

	// ChunkIntervalMs is the interval between chunks in milliseconds
	// when streaming in real-time mode.
	ChunkIntervalMs int
}

AudioStreamer provides utilities for streaming audio data through a pipeline.

func NewAudioStreamer

func NewAudioStreamer() *AudioStreamer

NewAudioStreamer creates a new audio streamer with default settings.

func (*AudioStreamer) SendChunk

func (a *AudioStreamer) SendChunk(
	ctx context.Context,
	chunk []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

SendChunk sends a single audio chunk through the pipeline.

func (*AudioStreamer) StreamBurst

func (a *AudioStreamer) StreamBurst(
	ctx context.Context,
	audioData []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

StreamBurst sends all audio data as fast as possible without pacing. This is preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.

The provider receives all audio before detecting any turn boundaries, which prevents "user interrupted" signals from arriving mid-utterance.

func (*AudioStreamer) StreamRealtime

func (a *AudioStreamer) StreamRealtime(
	ctx context.Context,
	audioData []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

StreamRealtime sends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.

Note: This mode can cause issues with some providers (like Gemini) that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.

type ImageStreamer added in v1.1.8

type ImageStreamer struct {
	// TargetFPS is the target frame rate for realtime streaming.
	// Default: 1.0 (1 frame per second).
	TargetFPS float64
}

ImageStreamer provides utilities for streaming image frames through a pipeline. Use this for realtime video scenarios like webcam feeds or screen sharing.

func NewImageStreamer added in v1.1.8

func NewImageStreamer(targetFPS float64) *ImageStreamer

NewImageStreamer creates a new image streamer with the specified target FPS. Use targetFPS of 0 or less for default (1.0 FPS).

func (*ImageStreamer) SendFrame added in v1.1.8

func (s *ImageStreamer) SendFrame(
	ctx context.Context,
	data []byte,
	mimeType string,
	frameNum int64,
	timestamp time.Time,
	output chan<- stage.StreamElement,
) error

SendFrame sends a single image frame through the pipeline without pacing. This is the burst mode equivalent - sends immediately without delay.

Parameters:

  • data: Raw image data (JPEG, PNG, etc.)
  • mimeType: MIME type of the image (e.g., "image/jpeg")
  • frameNum: Sequence number for ordering
  • timestamp: When the frame was captured
  • output: Pipeline input channel

func (*ImageStreamer) SendFrameWithDimensions added in v1.1.8

func (s *ImageStreamer) SendFrameWithDimensions(
	ctx context.Context,
	data []byte,
	mimeType string,
	width, height int,
	frameNum int64,
	timestamp time.Time,
	output chan<- stage.StreamElement,
) error

SendFrameWithDimensions sends a frame with explicit width and height. Use this when dimensions are known to avoid decoding overhead downstream.

func (*ImageStreamer) StreamFramesBurst added in v1.1.8

func (s *ImageStreamer) StreamFramesBurst(
	ctx context.Context,
	frames [][]byte,
	mimeType string,
	output chan<- stage.StreamElement,
) error

StreamFramesBurst sends all frames as fast as possible without pacing. Use this for pre-recorded frame sequences where real-time pacing isn't needed.

func (*ImageStreamer) StreamFramesRealtime added in v1.1.8

func (s *ImageStreamer) StreamFramesRealtime(
	ctx context.Context,
	frames [][]byte,
	mimeType string,
	output chan<- stage.StreamElement,
) error

StreamFramesRealtime sends frames paced to match the target FPS. Use this for simulating real-time playback of pre-recorded frames.

type ResponseAction

type ResponseAction int

ResponseAction indicates what action to take after processing a response element.

const (
	// ResponseActionContinue means the element was informational (e.g., interruption signal),
	// and we should continue waiting for the final response.
	ResponseActionContinue ResponseAction = iota
	// ResponseActionComplete means we received a complete response.
	ResponseActionComplete
	// ResponseActionError means an error occurred or the response was empty.
	ResponseActionError
	// ResponseActionToolCalls means the response contains tool calls that need to be executed.
	ResponseActionToolCalls
)

func ProcessResponseElement

func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)

ProcessResponseElement handles a response element from the pipeline, determining the appropriate action based on interruption signals, turn completion, and errors.

This is the core state machine for duplex streaming response handling. It consolidates the response handling logic needed for bidirectional streaming.

Returns:

  • ResponseAction: what action to take
  • error: any error to return (only set when action is ResponseActionError)

func (ResponseAction) String

func (a ResponseAction) String() string

String returns a human-readable representation of the action.

type ResponseCollector

type ResponseCollector struct {
	// contains filtered or unexported fields
}

ResponseCollector manages response collection from a streaming session. It processes streaming elements, handles tool calls, and signals completion.

func NewResponseCollector

func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollector

NewResponseCollector creates a new response collector with the given configuration.

func (*ResponseCollector) Start

func (c *ResponseCollector) Start(
	ctx context.Context,
	outputChan <-chan stage.StreamElement,
	inputChan chan<- stage.StreamElement,
) <-chan error

Start begins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.

The collector will: 1. Process incoming stream elements 2. Execute tool calls via the ToolExecutor (if configured) 3. Send tool results back through inputChan 4. Signal completion or error through the returned channel

type ResponseCollectorConfig

type ResponseCollectorConfig struct {
	// ToolExecutor is called when tool calls are received.
	// If nil, tool calls will result in an error.
	ToolExecutor ToolExecutor

	// LogPrefix is prepended to log messages for identification.
	LogPrefix string
}

ResponseCollectorConfig configures response collection behavior.

type ToolExecutionResult

type ToolExecutionResult struct {
	// ProviderResponses are formatted for sending back to the streaming provider.
	ProviderResponses []providers.ToolResponse

	// ResultMessages are formatted for state store capture,
	// matching the behavior of non-streaming tool execution.
	ResultMessages []types.Message
}

ToolExecutionResult contains the results of executing tool calls.

type ToolExecutor

type ToolExecutor interface {
	// Execute runs the given tool calls and returns their results.
	// The implementation is responsible for handling execution errors
	// and formatting them appropriately in the result.
	Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)
}

ToolExecutor executes tool calls and returns results. Implementations provide the actual tool registry integration.

type VideoChunk added in v1.1.8

type VideoChunk struct {
	Data       []byte
	IsKeyFrame bool
	Timestamp  time.Time
	Duration   time.Duration
}

VideoChunk represents a video chunk with metadata for batch streaming.

type VideoStreamer added in v1.1.8

type VideoStreamer struct {
	// ChunkDurationMs is the target duration of each video chunk in milliseconds.
	// Default: 1000 (1 second).
	ChunkDurationMs int
}

VideoStreamer provides utilities for streaming video chunks through a pipeline. Use this for encoded video segments (H.264, VP8, etc.) rather than individual frames. For individual image frames, use ImageStreamer instead.

func NewVideoStreamer added in v1.1.8

func NewVideoStreamer(chunkDurationMs int) *VideoStreamer

NewVideoStreamer creates a new video streamer with the specified chunk duration. Use chunkDurationMs of 0 or less for default (1000ms).

func (*VideoStreamer) SendChunk added in v1.1.8

func (s *VideoStreamer) SendChunk(
	ctx context.Context,
	data []byte,
	mimeType string,
	chunkIndex int,
	isKeyFrame bool,
	timestamp time.Time,
	output chan<- stage.StreamElement,
) error

SendChunk sends a single video chunk through the pipeline.

Parameters:

  • data: Encoded video data (H.264, VP8, etc.)
  • mimeType: MIME type of the video (e.g., "video/h264", "video/webm")
  • chunkIndex: Sequence number for ordering
  • isKeyFrame: True if this chunk contains a keyframe (important for decoding)
  • timestamp: When the chunk was captured/created
  • output: Pipeline input channel

func (*VideoStreamer) SendChunkWithDimensions added in v1.1.8

func (s *VideoStreamer) SendChunkWithDimensions(
	ctx context.Context,
	data []byte,
	mimeType string,
	width, height int,
	frameRate float64,
	chunkIndex int,
	isKeyFrame bool,
	timestamp time.Time,
	duration time.Duration,
	output chan<- stage.StreamElement,
) error

SendChunkWithDimensions sends a video chunk with explicit dimensions and frame rate. Use this when video metadata is known to avoid parsing overhead downstream.

func (*VideoStreamer) StreamChunksBurst added in v1.1.8

func (s *VideoStreamer) StreamChunksBurst(
	ctx context.Context,
	chunks []VideoChunk,
	mimeType string,
	output chan<- stage.StreamElement,
) error

StreamChunksBurst sends all video chunks as fast as possible without pacing. Use this for pre-recorded video where real-time pacing isn't needed.

func (*VideoStreamer) StreamChunksRealtime added in v1.1.8

func (s *VideoStreamer) StreamChunksRealtime(
	ctx context.Context,
	chunks []VideoChunk,
	mimeType string,
	output chan<- stage.StreamElement,
) error

StreamChunksRealtime sends video chunks paced according to their duration. Use this for simulating real-time playback of pre-recorded video.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL