Documentation
¶
Overview ¶
Package voiceagent implements the Voice Agent WebSocket surface on the Server-Target. It provides three things:
- A Session Manager that tracks active Voice Agent sessions, enforces per-identity and global concurrency limits, and mints HMAC-signed one-time tickets so browser clients can upgrade a WebSocket without sending Authorization headers.
- A wire protocol (see protocol.go) that carries control frames as JSON and audio frames as binary.
- An adapter that bridges the WebSocket to the Framework kernel's internal/voiceagent.Session without the kernel needing to know anything about HTTP.
This file covers the Session Manager and ticket machinery. The wire protocol, WebSocket handler, and adapter live in sibling files.
Index ¶
- Constants
- Variables
- type ActivityDetectionFrame
- type Adapter
- type CascadedAgent
- type CascadedConfig
- type CascadedDeps
- type CascadedProvider
- func (p *CascadedProvider) Close() error
- func (p *CascadedProvider) Connect(ctx context.Context, cfg LiveConfigFrame) error
- func (p *CascadedProvider) Name() string
- func (p *CascadedProvider) Receive(ctx context.Context) (*LiveMessage, error)
- func (p *CascadedProvider) SendAudio(chunk []byte) error
- func (p *CascadedProvider) SendAudioStreamEnd() error
- func (p *CascadedProvider) SendText(text string) error
- type CascadedSTT
- type CascadedTTS
- type ErrorFrame
- type Handler
- type HandlerOptions
- type Identity
- type InterruptedFrame
- type LiveConfigFrame
- type LiveMessage
- type LiveProviderAdapter
- type ManagedSession
- type Options
- type PersonaResolver
- type PongFrame
- type ProviderFactory
- type SequenceStepFrame
- type SessionEndFrame
- type SessionManager
- func (m *SessionManager) Attach(id string) error
- func (m *SessionManager) Create(owner Identity) (*ManagedSession, string, error)
- func (m *SessionManager) Get(id string) (*ManagedSession, error)
- func (m *SessionManager) List(userID string) []*ManagedSession
- func (m *SessionManager) Remove(id string)
- func (m *SessionManager) VerifyTicket(sessionID, ticket string) error
- type StartFrame
- type State
- type StateFrame
- type TextFrame
- type ToolCallFrame
- type ToolResponseFrame
- type TranscriptFrame
Constants ¶
const ( MsgStart = "start" MsgAudioEnd = "audio_end" MsgText = "text" MsgToolResponse = "tool_response" MsgPing = "ping" MsgStop = "stop" MsgAdvanceStep = "advance_step" )
Client-to-server message types.
const ( MsgState = "state" MsgInputTranscript = "input_transcript" MsgOutputTranscript = "output_transcript" MsgToolCall = "tool_call" MsgSequenceStep = "sequence_step" MsgInterrupted = "interrupted" MsgError = "error" MsgSessionEnd = "session_end" MsgPong = "pong" )
Server-to-client message types.
Variables ¶
var ( ErrSessionNotFound = errors.New("voiceagent: session not found") ErrSessionAlreadyActive = errors.New("voiceagent: session already has an active WS connection") ErrSessionExpired = errors.New("voiceagent: session ticket expired") ErrInvalidTicket = errors.New("voiceagent: ticket signature or payload invalid") ErrGlobalLimitExceeded = errors.New("voiceagent: global session limit exceeded") ErrIdentityLimitExceeded = errors.New("voiceagent: per-identity session limit exceeded") )
Common errors reported by the session manager.
Functions ¶
This section is empty.
Types ¶
type ActivityDetectionFrame ¶
type ActivityDetectionFrame struct {
Automatic bool `json:"automatic"`
StartSensitivity string `json:"start_sensitivity,omitempty"`
EndSensitivity string `json:"end_sensitivity,omitempty"`
PrefixPaddingMs int32 `json:"prefix_padding_ms,omitempty"`
SilenceDurationMs int32 `json:"silence_duration_ms,omitempty"`
ActivityHandling string `json:"activity_handling,omitempty"`
TurnCoverage string `json:"turn_coverage,omitempty"`
}
ActivityDetectionFrame maps to voiceagent.ActivityDetectionPolicy.
type Adapter ¶
type Adapter struct {
Session *ManagedSession
Conn *websocket.Conn
Provider LiveProviderAdapter
Persona PersonaResolver
// IdleTimeout terminates a session whose readPump and writePump have
// both been silent for the duration. Zero disables the server-side
// idle watchdog. Defaults to 15 minutes when set by the WS handler.
IdleTimeout time.Duration
// OnClose runs after both pumps have returned. Typically removes the
// session from the manager.
OnClose func()
// contains filtered or unexported fields
}
Adapter bridges a live WebSocket conversation to the Framework kernel's voiceagent session. One Adapter instance handles exactly one session, which matches the manager's concurrency model.
The adapter owns two long-lived goroutines:
readPump: WebSocket → kernel (control + audio frames from client) writePump: kernel → WebSocket (audio + transcript + tool-call frames)
When either pump errors or the provider reports session_end, the adapter closes the socket, calls OnClose, and returns from Run.
type CascadedAgent ¶
type CascadedAgent interface {
Run(ctx context.Context, input flows.AgentInput) (flows.AgentOutput, error)
}
CascadedAgent is the LLM surface the provider uses. In production this is the Genkit agent flow defined by flows.DefineAgentFlow.
func NewAgentFlowAdapter ¶
func NewAgentFlowAdapter(flow *core.Flow[flows.AgentInput, flows.AgentOutput, struct{}]) CascadedAgent
NewAgentFlowAdapter wraps a Genkit agent flow so it satisfies CascadedAgent. Returned as a separate named type to keep the production wiring readable.
type CascadedConfig ¶
type CascadedConfig struct {
// SilenceRMSThreshold is the RMS level (0.0–1.0) below which a frame is
// considered silence. Default 0.02.
SilenceRMSThreshold float64
// SilenceTurnMs is the minimum silence duration before we commit the
// current turn. Default 800 ms.
SilenceTurnMs int
// MinTurnMs is the minimum accumulated non-silence needed to treat a
// buffer as a real turn. Default 300 ms — filters coughs and clicks.
MinTurnMs int
// MaxTurnMs caps a runaway turn before forcing processing. Default
// 30 000 ms.
MaxTurnMs int
// HistoryTurns is how many (user, assistant) turns to keep as rolling
// context for the agent flow. Default 5.
HistoryTurns int
// TTSFormat is the audio format the TTS provider should emit. Default
// "mp3" (provider passthrough, client transcodes if needed).
TTSFormat string
// TTSSpeed is the TTS speech rate. Default 1.0.
TTSSpeed float64
}
CascadedConfig tunes turn detection and processing. Zero values are filled by NewCascadedProvider with sensible defaults.
type CascadedDeps ¶
type CascadedDeps struct {
STT CascadedSTT
Agent CascadedAgent
TTS CascadedTTS
Config CascadedConfig
}
CascadedDeps bundles everything the bootstrap hands to the provider.
type CascadedProvider ¶
type CascadedProvider struct {
// contains filtered or unexported fields
}
CascadedProvider implements LiveProviderAdapter as a turn-based STT → LLM → TTS pipeline. It is the self-hosted-friendly alternative to real-time providers like Gemini Live and Moshi.
Turn detection uses energy-based silence detection (RMS of the 16 kHz S16 PCM stream). When the caller wants tighter control it can send an audio_end frame which triggers immediate processing regardless of the silence heuristic.
The provider is intentionally self-contained and testable with pure fakes — CascadedSTT, CascadedAgent, and CascadedTTS are narrow interfaces the bootstrap wires production implementations into.
func NewCascadedProvider ¶
func NewCascadedProvider(deps CascadedDeps) *CascadedProvider
NewCascadedProvider constructs a provider without starting background work. Connect() initializes the goroutine that performs turn processing.
func (*CascadedProvider) Close ¶
func (p *CascadedProvider) Close() error
Close stops the processor loop and drains any pending buffer.
func (*CascadedProvider) Connect ¶
func (p *CascadedProvider) Connect(ctx context.Context, cfg LiveConfigFrame) error
Connect validates that the required dependencies for LiveConfigFrame are satisfied and starts the processor loop. Unlike Gemini Live, no external handshake happens.
func (*CascadedProvider) Name ¶
func (p *CascadedProvider) Name() string
Name returns the provider identifier used in logs + /admin/status.
func (*CascadedProvider) Receive ¶
func (p *CascadedProvider) Receive(ctx context.Context) (*LiveMessage, error)
Receive blocks until the next message is ready or the provider closes.
func (*CascadedProvider) SendAudio ¶
func (p *CascadedProvider) SendAudio(chunk []byte) error
SendAudio appends PCM to the current turn buffer and triggers processing when a silence boundary is reached.
func (*CascadedProvider) SendAudioStreamEnd ¶
func (p *CascadedProvider) SendAudioStreamEnd() error
SendAudioStreamEnd forces the current buffer to be treated as a complete turn, even if silence has not yet been detected.
func (*CascadedProvider) SendText ¶
func (p *CascadedProvider) SendText(text string) error
SendText injects a text turn (skipping STT). Useful for testing and for clients that already have a transcript from their own STT.
type CascadedSTT ¶
type CascadedSTT interface {
Route(ctx context.Context, audio []byte, audioDurationSecs float64, opts stt.TranscribeOpts) (*stt.Result, error)
}
CascadedSTT is the STT surface the provider uses. In production this is the same *internal/router.Router that serves /v1/dictation/transcribe.
type CascadedTTS ¶
type CascadedTTS interface {
Synthesize(ctx context.Context, text string, opts tts.SynthesizeOpts) (*tts.Result, error)
}
CascadedTTS is the TTS surface the provider uses. In production this is the *internal/tts.Router that serves Assist responses.
type ErrorFrame ¶
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler exposes both the HTTP session-creation endpoint and the WS upgrade endpoint under /v1/voiceagent/*.
func New ¶
func New(opts HandlerOptions) (*Handler, error)
New constructs a handler. All options except MaxAllowedClockSkew are required — the adapter cannot function without a manager, provider, and persona resolver.
func (*Handler) Mount ¶
Mount wires the voiceagent endpoints onto mux:
POST /v1/voiceagent/sessions — create session + mint ticket
GET /v1/voiceagent/sessions — list caller's active sessions
DELETE /v1/voiceagent/sessions/{id} — force close a session
GET /v1/voiceagent/sessions/{id}/ws?ticket=... — upgrade to WebSocket
type HandlerOptions ¶
type HandlerOptions struct {
Manager *SessionManager
Provider ProviderFactory
Persona PersonaResolver
MaxAllowedClockSkew time.Duration
// IdleTimeout terminates a session that hasn't seen any activity
// (client frame OR provider message) within the duration. Zero
// disables the server-side idle watchdog. Defaults to 15 minutes
// when zero is passed; pass a negative value to disable explicitly.
IdleTimeout time.Duration
}
HandlerOptions configures the WebSocket handler.
type Identity ¶
Identity is the caller's resolved identity from the auth middleware. Stored on each session so closes can verify ownership.
type InterruptedFrame ¶
type InterruptedFrame struct {
Type string `json:"type"` // "interrupted"
}
type LiveConfigFrame ¶
type LiveConfigFrame struct {
Model string
// FallbackModel is forwarded to providers that support same-provider
// fallback (kernel Gemini Live retries the fallback when the primary
// connect fails). Empty disables the fallback.
FallbackModel string
APIKey string
Voice string
SystemPrompt string
RefinementPrompt string
Locale string
// Raw activity-detection passthrough; adapter translates to the
// kernel's internal types.
Automatic bool
StartSensitivity string
EndSensitivity string
PrefixPaddingMs int32
SilenceDurationMs int32
ActivityHandling string
TurnCoverage string
}
LiveConfigFrame is the subset of configuration the adapter derives from a StartFrame and the persona/role resolver. Kept as a separate type so the test double doesn't need to depend on the kernel's concrete LiveConfig (which embeds Google genai types).
type LiveMessage ¶
type LiveMessage struct {
Audio []byte
OutputTranscript string
OutputTranscriptDone bool
InputTranscript string
InputTranscriptDone bool
Interrupted bool
GoAway bool
}
LiveMessage is the subset of kernel/internal/voiceagent.LiveMessage the adapter relays to the client. Matching field names keep the translation trivial.
type LiveProviderAdapter ¶
type LiveProviderAdapter interface {
Connect(ctx context.Context, cfg LiveConfigFrame) error
SendAudio(chunk []byte) error
SendAudioStreamEnd() error
SendText(text string) error
Receive(ctx context.Context) (*LiveMessage, error)
Close() error
Name() string
}
LiveProviderAdapter is the minimal slice of the kernel's LiveProvider interface the Server-Target adapter needs. Keeping it narrow makes it trivial for tests to stub without pulling in the full genai SDK.
type ManagedSession ¶
type ManagedSession struct {
ID string
Owner Identity
CreatedAt time.Time
State State
HasWSClient bool // true once the client has upgraded
}
ManagedSession is the manager's record for one active or pending session. The WebSocket handler and adapter pull additional fields (conn, pumps) onto this struct at handshake time.
type Options ¶
type Options struct {
// TicketSecret signs the one-time WS upgrade tickets. Must be at least
// 16 bytes; when empty, the manager generates a random secret at
// construction time (acceptable only for single-process deployments).
TicketSecret []byte
// TicketTTL limits how long a minted ticket remains valid. Defaults to
// 30 seconds.
TicketTTL time.Duration
// MaxGlobalSessions caps concurrent sessions across all callers.
// Defaults to 100.
MaxGlobalSessions int
// MaxPerIdentitySessions caps concurrent sessions per caller.
// Defaults to 3.
MaxPerIdentitySessions int
// Clock is a time source; nil means time.Now. Tests can override.
Clock func() time.Time
}
Options configures a SessionManager.
type PersonaResolver ¶
type PersonaResolver interface {
Resolve(StartFrame) (LiveConfigFrame, error)
}
PersonaResolver derives a LiveConfigFrame from a StartFrame. The server's persona registry implements this in M5; for M4 a stub resolver is used that echoes the StartFrame through with sensible defaults.
type ProviderFactory ¶
type ProviderFactory interface {
NewProvider() LiveProviderAdapter
}
ProviderFactory builds a Framework kernel voice-agent provider on demand. Each WebSocket session gets its own provider instance so concurrent sessions don't share Gemini Live state.
The concrete production implementation returns a *voiceagent.GeminiLive; tests supply a fake that records frames and replies with canned audio.
type SequenceStepFrame ¶
type SessionEndFrame ¶
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager tracks active Voice Agent sessions.
func NewSessionManager ¶
func NewSessionManager(opts Options) (*SessionManager, error)
NewSessionManager constructs a manager with opts. Unset fields receive sensible defaults.
func (*SessionManager) Attach ¶
func (m *SessionManager) Attach(id string) error
Attach moves a session from pending to active. Returns an error when the session is already attached to another WS client or has been closed.
func (*SessionManager) Create ¶
func (m *SessionManager) Create(owner Identity) (*ManagedSession, string, error)
Create registers a new pending session for the given caller and returns the session record plus a one-time WS upgrade ticket. Fails with a limit error when concurrency caps are exceeded.
func (*SessionManager) Get ¶
func (m *SessionManager) Get(id string) (*ManagedSession, error)
Get returns the session with the given ID, or ErrSessionNotFound.
func (*SessionManager) List ¶
func (m *SessionManager) List(userID string) []*ManagedSession
List returns a snapshot of sessions owned by the given user (or all when userID is empty — reserved for admin callers).
func (*SessionManager) Remove ¶
func (m *SessionManager) Remove(id string)
Remove closes and removes the session. Safe to call multiple times.
func (*SessionManager) VerifyTicket ¶
func (m *SessionManager) VerifyTicket(sessionID, ticket string) error
VerifyTicket validates a ticket string against its expected session ID. Returns nil when the ticket is cryptographically valid, un-expired, and bound to this sessionID. Mutates nothing; callers must follow up with Attach to mark the session active.
type StartFrame ¶
type StartFrame struct {
Type string `json:"type"` // must be "start"
PersonaID string `json:"persona_id,omitempty"`
RoleID string `json:"role_id,omitempty"`
SequenceID string `json:"sequence_id,omitempty"`
Voice string `json:"voice,omitempty"`
Locale string `json:"locale,omitempty"`
Model string `json:"model,omitempty"`
Thinking string `json:"thinking,omitempty"` // "off" | "low" | "medium" | "high"
// Raw activity-detection policy override. Pipeline translates these to
// the kernel's internal enums.
ActivityDetection *ActivityDetectionFrame `json:"activity_detection,omitempty"`
// Optional durable instruction layered on top of the role's prompt.
SystemPromptOverride string `json:"system_prompt_override,omitempty"`
}
StartFrame is the mandatory first client frame. Fields marked optional fall back to persona/role defaults when omitted.
type State ¶
type State string
State captures the session's manager-level lifecycle. The Framework kernel (internal/voiceagent.Session) has its own finer-grained state; this one only distinguishes pending ticket vs. active connection.
type StateFrame ¶
type ToolCallFrame ¶
type ToolResponseFrame ¶
type ToolResponseFrame struct {
Type string `json:"type"` // "tool_response"
ID string `json:"id"`
Name string `json:"name"`
Response map[string]any `json:"response"`
}
ToolResponseFrame resolves a tool call issued by the server.