streaming

package
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package streaming provides generic utilities for bidirectional streaming communication with LLM providers.

This package extracts common patterns used in duplex (bidirectional) streaming conversations, including:

  • Response processing state machine for handling provider responses
  • Tool execution interface for streaming tool calls
  • Audio streaming utilities for sending audio chunks to providers
  • Response collection patterns for managing streaming responses

The package is designed to be provider-agnostic, working with any provider that implements the runtime/providers streaming interfaces.

Response Processing

The response state machine (ProcessResponseElement) analyzes stream elements and determines appropriate actions:

  • Continue: informational element, keep waiting
  • Complete: turn finished with valid response
  • Error: error or unexpected empty response
  • ToolCalls: tool calls need execution

Tool Execution

The ToolExecutor interface allows custom tool registry implementations to be plugged in. The package provides helpers for sending tool results back through the streaming pipeline.

Audio Streaming

AudioStreamer provides utilities for streaming audio data in either burst mode (all at once) or real-time mode (paced to match playback speed).

Response Collection

ResponseCollector manages the goroutine pattern for collecting streaming responses from a provider session, with optional tool call handling.

Index

Constants

View Source
const (
	// DefaultChunkSize is the default audio chunk size in bytes.
	// 640 bytes = 20ms at 16kHz 16-bit mono (16000 * 2 * 0.02)
	DefaultChunkSize = 640

	// DefaultSampleRate is the default audio sample rate in Hz.
	// 16kHz is required by Gemini Live API.
	DefaultSampleRate = 16000

	// DefaultChunkIntervalMs is the default interval between chunks in milliseconds
	// when streaming in real-time mode.
	DefaultChunkIntervalMs = 20
)

Default audio configuration constants

Variables

View Source
var ErrEmptyResponse = errors.New("empty response, likely interrupted")

ErrEmptyResponse is returned when a response element has no content. This typically indicates an interrupted response that wasn't properly handled.

View Source
var ErrSessionEnded = errors.New("session ended")

ErrSessionEnded is returned when the streaming session has ended. This is not necessarily an error, just indicates the session is complete.

Functions

func BuildToolResponseElement

func BuildToolResponseElement(result *ToolExecutionResult) stage.StreamElement

BuildToolResponseElement creates a stream element containing tool results. This element can be sent through the pipeline to: 1. Forward tool responses to the provider (via metadata["tool_responses"]) 2. Capture tool results in the state store (via metadata["tool_result_messages"])

func DrainStaleMessages

func DrainStaleMessages(outputChan <-chan stage.StreamElement) (int, error)

DrainStaleMessages removes any buffered messages from the output channel. This is useful for clearing state between turns.

Returns the number of messages drained, or an error if the session ended.

func ExecuteAndSend

func ExecuteAndSend(
	ctx context.Context,
	executor ToolExecutor,
	toolCalls []types.MessageToolCall,
	inputChan chan<- stage.StreamElement,
) error

ExecuteAndSend is a convenience function that executes tool calls and sends the results through the pipeline in one operation.

If the executor is nil, this function returns nil (no-op).

func SendEndOfStream

func SendEndOfStream(
	ctx context.Context,
	inputChan chan<- stage.StreamElement,
) error

SendEndOfStream signals that audio input is complete for the current turn. This triggers the provider to generate a response.

func SendToolResults

func SendToolResults(
	ctx context.Context,
	result *ToolExecutionResult,
	inputChan chan<- stage.StreamElement,
) error

SendToolResults sends tool execution results back through the pipeline to the provider, and includes tool result messages for state store capture.

This matches the behavior of non-streaming mode where tool results are stored as messages. The tool result messages are sent via inputChan with metadata, and DuplexProviderStage forwards them to output for state store capture.

func WaitForResponse

func WaitForResponse(ctx context.Context, responseDone <-chan error) error

WaitForResponse waits for the response collection to complete. This is a convenience function for blocking until a response is received.

Types

type AudioStreamer

type AudioStreamer struct {
	// ChunkSize is the number of bytes per chunk.
	ChunkSize int

	// ChunkIntervalMs is the interval between chunks in milliseconds
	// when streaming in real-time mode.
	ChunkIntervalMs int
}

AudioStreamer provides utilities for streaming audio data through a pipeline.

func NewAudioStreamer

func NewAudioStreamer() *AudioStreamer

NewAudioStreamer creates a new audio streamer with default settings.

func (*AudioStreamer) SendChunk

func (a *AudioStreamer) SendChunk(
	ctx context.Context,
	chunk []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

SendChunk sends a single audio chunk through the pipeline.

func (*AudioStreamer) StreamBurst

func (a *AudioStreamer) StreamBurst(
	ctx context.Context,
	audioData []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

StreamBurst sends all audio data as fast as possible without pacing. This is preferred for pre-recorded audio to avoid false turn detections from natural speech pauses.

The provider receives all audio before detecting any turn boundaries, which prevents "user interrupted" signals from arriving mid-utterance.

func (*AudioStreamer) StreamRealtime

func (a *AudioStreamer) StreamRealtime(
	ctx context.Context,
	audioData []byte,
	sampleRate int,
	inputChan chan<- stage.StreamElement,
) error

StreamRealtime sends audio data paced to match real-time playback. Each chunk is sent with a delay matching its duration.

Note: This mode can cause issues with some providers (like Gemini) that detect speech pauses mid-utterance. Use StreamBurst for pre-recorded audio.

type ResponseAction

type ResponseAction int

ResponseAction indicates what action to take after processing a response element.

const (
	// ResponseActionContinue means the element was informational (e.g., interruption signal),
	// and we should continue waiting for the final response.
	ResponseActionContinue ResponseAction = iota
	// ResponseActionComplete means we received a complete response.
	ResponseActionComplete
	// ResponseActionError means an error occurred or the response was empty.
	ResponseActionError
	// ResponseActionToolCalls means the response contains tool calls that need to be executed.
	ResponseActionToolCalls
)

func ProcessResponseElement

func ProcessResponseElement(elem *stage.StreamElement, logPrefix string) (ResponseAction, error)

ProcessResponseElement handles a response element from the pipeline, determining the appropriate action based on interruption signals, turn completion, and errors.

This is the core state machine for duplex streaming response handling. It consolidates the response handling logic needed for bidirectional streaming.

Returns:

  • ResponseAction: what action to take
  • error: any error to return (only set when action is ResponseActionError)

func (ResponseAction) String

func (a ResponseAction) String() string

String returns a human-readable representation of the action.

type ResponseCollector

type ResponseCollector struct {
	// contains filtered or unexported fields
}

ResponseCollector manages response collection from a streaming session. It processes streaming elements, handles tool calls, and signals completion.

func NewResponseCollector

func NewResponseCollector(config ResponseCollectorConfig) *ResponseCollector

NewResponseCollector creates a new response collector with the given configuration.

func (*ResponseCollector) Start

func (c *ResponseCollector) Start(
	ctx context.Context,
	outputChan <-chan stage.StreamElement,
	inputChan chan<- stage.StreamElement,
) <-chan error

Start begins collecting responses in a goroutine. Returns a channel that receives nil on success or an error on failure.

The collector will: 1. Process incoming stream elements 2. Execute tool calls via the ToolExecutor (if configured) 3. Send tool results back through inputChan 4. Signal completion or error through the returned channel

type ResponseCollectorConfig

type ResponseCollectorConfig struct {
	// ToolExecutor is called when tool calls are received.
	// If nil, tool calls will result in an error.
	ToolExecutor ToolExecutor

	// LogPrefix is prepended to log messages for identification.
	LogPrefix string
}

ResponseCollectorConfig configures response collection behavior.

type ToolExecutionResult

type ToolExecutionResult struct {
	// ProviderResponses are formatted for sending back to the streaming provider.
	ProviderResponses []providers.ToolResponse

	// ResultMessages are formatted for state store capture,
	// matching the behavior of non-streaming tool execution.
	ResultMessages []types.Message
}

ToolExecutionResult contains the results of executing tool calls.

type ToolExecutor

type ToolExecutor interface {
	// Execute runs the given tool calls and returns their results.
	// The implementation is responsible for handling execution errors
	// and formatting them appropriately in the result.
	Execute(ctx context.Context, toolCalls []types.MessageToolCall) (*ToolExecutionResult, error)
}

ToolExecutor executes tool calls and returns results. Implementations provide the actual tool registry integration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL