voiceagent

package
v0.28.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package voiceagent implements the Voice Agent WebSocket surface on the Server-Target. It provides three things:

  1. A Session Manager that tracks active Voice Agent sessions, enforces per-identity and global concurrency limits, and mints HMAC-signed one-time tickets so browser clients can upgrade a WebSocket without sending Authorization headers.
  2. A wire protocol (see protocol.go) that carries control frames as JSON and audio frames as binary.
  3. An adapter that bridges the WebSocket to the Framework kernel's internal/voiceagent.Session without the kernel needing to know anything about HTTP.

This file covers the Session Manager and ticket machinery. The wire protocol, WebSocket handler, and adapter live in sibling files.

Index

Constants

View Source
const (
	MsgStart        = "start"
	MsgAudioEnd     = "audio_end"
	MsgText         = "text"
	MsgToolResponse = "tool_response"
	MsgPing         = "ping"
	MsgStop         = "stop"
	MsgAdvanceStep  = "advance_step"
)

Client-to-server message types.

View Source
const (
	MsgState            = "state"
	MsgInputTranscript  = "input_transcript"
	MsgOutputTranscript = "output_transcript"
	MsgToolCall         = "tool_call"
	MsgSequenceStep     = "sequence_step"
	MsgInterrupted      = "interrupted"
	MsgError            = "error"
	MsgSessionEnd       = "session_end"
	MsgPong             = "pong"
)

Server-to-client message types.

Variables

View Source
var (
	ErrSessionNotFound       = errors.New("voiceagent: session not found")
	ErrSessionAlreadyActive  = errors.New("voiceagent: session already has an active WS connection")
	ErrSessionExpired        = errors.New("voiceagent: session ticket expired")
	ErrInvalidTicket         = errors.New("voiceagent: ticket signature or payload invalid")
	ErrGlobalLimitExceeded   = errors.New("voiceagent: global session limit exceeded")
	ErrIdentityLimitExceeded = errors.New("voiceagent: per-identity session limit exceeded")
)

Common errors reported by the session manager.

Functions

This section is empty.

Types

type ActivityDetectionFrame

type ActivityDetectionFrame struct {
	Automatic         bool   `json:"automatic"`
	StartSensitivity  string `json:"start_sensitivity,omitempty"`
	EndSensitivity    string `json:"end_sensitivity,omitempty"`
	PrefixPaddingMs   int32  `json:"prefix_padding_ms,omitempty"`
	SilenceDurationMs int32  `json:"silence_duration_ms,omitempty"`
	ActivityHandling  string `json:"activity_handling,omitempty"`
	TurnCoverage      string `json:"turn_coverage,omitempty"`
}

ActivityDetectionFrame maps to voiceagent.ActivityDetectionPolicy.

type Adapter

type Adapter struct {
	Session  *ManagedSession
	Conn     *websocket.Conn
	Provider LiveProviderAdapter
	Persona  PersonaResolver
	// IdleTimeout terminates a session whose readPump and writePump have
	// both been silent for the duration. Zero disables the server-side
	// idle watchdog. Defaults to 15 minutes when set by the WS handler.
	IdleTimeout time.Duration
	// OnClose runs after both pumps have returned. Typically removes the
	// session from the manager.
	OnClose func()
	// contains filtered or unexported fields
}

Adapter bridges a live WebSocket conversation to the Framework kernel's voiceagent session. One Adapter instance handles exactly one session, which matches the manager's concurrency model.

The adapter owns two long-lived goroutines:

readPump:  WebSocket → kernel (control + audio frames from client)
writePump: kernel → WebSocket (audio + transcript + tool-call frames)

When either pump errors or the provider reports session_end, the adapter closes the socket, calls OnClose, and returns from Run.

func (*Adapter) Run

func (a *Adapter) Run(parent context.Context)

Run blocks until the session ends. The first frame from the client MUST be a StartFrame; if it isn't, the adapter closes with an error.

type CascadedAgent

type CascadedAgent interface {
	Run(ctx context.Context, input flows.AgentInput) (flows.AgentOutput, error)
}

CascadedAgent is the LLM surface the provider uses. In production this is the Genkit agent flow defined by flows.DefineAgentFlow.

func NewAgentFlowAdapter

func NewAgentFlowAdapter(flow *core.Flow[flows.AgentInput, flows.AgentOutput, struct{}]) CascadedAgent

NewAgentFlowAdapter wraps a Genkit agent flow so it satisfies CascadedAgent. Returned as a separate named type to keep the production wiring readable.

type CascadedConfig

type CascadedConfig struct {
	// SilenceRMSThreshold is the RMS level (0.0–1.0) below which a frame is
	// considered silence. Default 0.02.
	SilenceRMSThreshold float64
	// SilenceTurnMs is the minimum silence duration before we commit the
	// current turn. Default 800 ms.
	SilenceTurnMs int
	// MinTurnMs is the minimum accumulated non-silence needed to treat a
	// buffer as a real turn. Default 300 ms — filters coughs and clicks.
	MinTurnMs int
	// MaxTurnMs caps a runaway turn before forcing processing. Default
	// 30 000 ms.
	MaxTurnMs int
	// HistoryTurns is how many (user, assistant) turns to keep as rolling
	// context for the agent flow. Default 5.
	HistoryTurns int
	// TTSFormat is the audio format the TTS provider should emit. Default
	// "mp3" (provider passthrough, client transcodes if needed).
	TTSFormat string
	// TTSSpeed is the TTS speech rate. Default 1.0.
	TTSSpeed float64
}

CascadedConfig tunes turn detection and processing. Zero values are filled by NewCascadedProvider with sensible defaults.

type CascadedDeps

type CascadedDeps struct {
	STT    CascadedSTT
	Agent  CascadedAgent
	TTS    CascadedTTS
	Config CascadedConfig
}

CascadedDeps bundles everything the bootstrap hands to the provider.

type CascadedProvider

type CascadedProvider struct {
	// contains filtered or unexported fields
}

CascadedProvider implements LiveProviderAdapter as a turn-based STT → LLM → TTS pipeline. It is the self-hosted-friendly alternative to real-time providers like Gemini Live and Moshi.

Turn detection uses energy-based silence detection (RMS of the 16 kHz S16 PCM stream). When the caller wants tighter control it can send an audio_end frame which triggers immediate processing regardless of the silence heuristic.

The provider is intentionally self-contained and testable with pure fakes — CascadedSTT, CascadedAgent, and CascadedTTS are narrow interfaces the bootstrap wires production implementations into.

func NewCascadedProvider

func NewCascadedProvider(deps CascadedDeps) *CascadedProvider

NewCascadedProvider constructs a provider without starting background work. Connect() initializes the goroutine that performs turn processing.

func (*CascadedProvider) Close

func (p *CascadedProvider) Close() error

Close stops the processor loop and drains any pending buffer.

func (*CascadedProvider) Connect

func (p *CascadedProvider) Connect(ctx context.Context, cfg LiveConfigFrame) error

Connect validates that the required dependencies for LiveConfigFrame are satisfied and starts the processor loop. Unlike Gemini Live, no external handshake happens.

func (*CascadedProvider) Name

func (p *CascadedProvider) Name() string

Name returns the provider identifier used in logs + /admin/status.

func (*CascadedProvider) Receive

func (p *CascadedProvider) Receive(ctx context.Context) (*LiveMessage, error)

Receive blocks until the next message is ready or the provider closes.

func (*CascadedProvider) SendAudio

func (p *CascadedProvider) SendAudio(chunk []byte) error

SendAudio appends PCM to the current turn buffer and triggers processing when a silence boundary is reached.

func (*CascadedProvider) SendAudioStreamEnd

func (p *CascadedProvider) SendAudioStreamEnd() error

SendAudioStreamEnd forces the current buffer to be treated as a complete turn, even if silence has not yet been detected.

func (*CascadedProvider) SendText

func (p *CascadedProvider) SendText(text string) error

SendText injects a text turn (skipping STT). Useful for testing and for clients that already have a transcript from their own STT.

type CascadedSTT

type CascadedSTT interface {
	Route(ctx context.Context, audio []byte, audioDurationSecs float64, opts stt.TranscribeOpts) (*stt.Result, error)
}

CascadedSTT is the STT surface the provider uses. In production this is the same *internal/router.Router that serves /v1/dictation/transcribe.

type CascadedTTS

type CascadedTTS interface {
	Synthesize(ctx context.Context, text string, opts tts.SynthesizeOpts) (*tts.Result, error)
}

CascadedTTS is the TTS surface the provider uses. In production this is the *internal/tts.Router that serves Assist responses.

type ErrorFrame

type ErrorFrame struct {
	Type    string `json:"type"` // "error"
	Code    string `json:"code"`
	Message string `json:"message"`
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler exposes both the HTTP session-creation endpoint and the WS upgrade endpoint under /v1/voiceagent/*.

func New

func New(opts HandlerOptions) (*Handler, error)

New constructs a handler. All options except MaxAllowedClockSkew are required — the adapter cannot function without a manager, provider, and persona resolver.

func (*Handler) Mount

func (h *Handler) Mount(mux *http.ServeMux)

Mount wires the voiceagent endpoints onto mux:

POST   /v1/voiceagent/sessions        — create session + mint ticket
GET    /v1/voiceagent/sessions        — list caller's active sessions
DELETE /v1/voiceagent/sessions/{id}   — force close a session
GET    /v1/voiceagent/sessions/{id}/ws?ticket=... — upgrade to WebSocket

type HandlerOptions

type HandlerOptions struct {
	Manager             *SessionManager
	Provider            ProviderFactory
	Persona             PersonaResolver
	MaxAllowedClockSkew time.Duration
	// IdleTimeout terminates a session that hasn't seen any activity
	// (client frame OR provider message) within the duration. Zero
	// disables the server-side idle watchdog. Defaults to 15 minutes
	// when zero is passed; pass a negative value to disable explicitly.
	IdleTimeout time.Duration
}

HandlerOptions configures the WebSocket handler.

type Identity

type Identity struct {
	UserID string
	OrgID  string
	Plan   string
	Role   string
}

Identity is the caller's resolved identity from the auth middleware. Stored on each session so closes can verify ownership.

type InterruptedFrame

type InterruptedFrame struct {
	Type string `json:"type"` // "interrupted"
}

type LiveConfigFrame

type LiveConfigFrame struct {
	Model string
	// FallbackModel is forwarded to providers that support same-provider
	// fallback (kernel Gemini Live retries the fallback when the primary
	// connect fails). Empty disables the fallback.
	FallbackModel    string
	APIKey           string
	Voice            string
	SystemPrompt     string
	RefinementPrompt string
	Locale           string
	// Raw activity-detection passthrough; adapter translates to the
	// kernel's internal types.
	Automatic         bool
	StartSensitivity  string
	EndSensitivity    string
	PrefixPaddingMs   int32
	SilenceDurationMs int32
	ActivityHandling  string
	TurnCoverage      string
}

LiveConfigFrame is the subset of configuration the adapter derives from a StartFrame and the persona/role resolver. Kept as a separate type so the test double doesn't need to depend on the kernel's concrete LiveConfig (which embeds Google genai types).

type LiveMessage

type LiveMessage struct {
	Audio                []byte
	OutputTranscript     string
	OutputTranscriptDone bool
	InputTranscript      string
	InputTranscriptDone  bool
	Interrupted          bool
	GoAway               bool
}

LiveMessage is the subset of kernel/internal/voiceagent.LiveMessage the adapter relays to the client. Matching field names keep the translation trivial.

type LiveProviderAdapter

type LiveProviderAdapter interface {
	Connect(ctx context.Context, cfg LiveConfigFrame) error
	SendAudio(chunk []byte) error
	SendAudioStreamEnd() error
	SendText(text string) error
	Receive(ctx context.Context) (*LiveMessage, error)
	Close() error
	Name() string
}

LiveProviderAdapter is the minimal slice of the kernel's LiveProvider interface the Server-Target adapter needs. Keeping it narrow makes it trivial for tests to stub without pulling in the full genai SDK.

type ManagedSession

type ManagedSession struct {
	ID          string
	Owner       Identity
	CreatedAt   time.Time
	State       State
	HasWSClient bool // true once the client has upgraded
}

ManagedSession is the manager's record for one active or pending session. The WebSocket handler and adapter pull additional fields (conn, pumps) onto this struct at handshake time.

type Options

type Options struct {
	// TicketSecret signs the one-time WS upgrade tickets. Must be at least
	// 16 bytes; when empty, the manager generates a random secret at
	// construction time (acceptable only for single-process deployments).
	TicketSecret []byte
	// TicketTTL limits how long a minted ticket remains valid. Defaults to
	// 30 seconds.
	TicketTTL time.Duration
	// MaxGlobalSessions caps concurrent sessions across all callers.
	// Defaults to 100.
	MaxGlobalSessions int
	// MaxPerIdentitySessions caps concurrent sessions per caller.
	// Defaults to 3.
	MaxPerIdentitySessions int
	// Clock is a time source; nil means time.Now. Tests can override.
	Clock func() time.Time
}

Options configures a SessionManager.

type PersonaResolver

type PersonaResolver interface {
	Resolve(StartFrame) (LiveConfigFrame, error)
}

PersonaResolver derives a LiveConfigFrame from a StartFrame. The server's persona registry implements this in M5; for M4 a stub resolver is used that echoes the StartFrame through with sensible defaults.

type PongFrame

type PongFrame struct {
	Type string `json:"type"` // "pong"
}

type ProviderFactory

type ProviderFactory interface {
	NewProvider() LiveProviderAdapter
}

ProviderFactory builds a Framework kernel voice-agent provider on demand. Each WebSocket session gets its own provider instance so concurrent sessions don't share Gemini Live state.

The concrete production implementation returns a *voiceagent.GeminiLive; tests supply a fake that records frames and replies with canned audio.

type SequenceStepFrame

type SequenceStepFrame struct {
	Type   string `json:"type"` // "sequence_step"
	StepID string `json:"step_id"`
	Status string `json:"status"` // "entered" | "completed"
}

type SessionEndFrame

type SessionEndFrame struct {
	Type   string `json:"type"`   // "session_end"
	Reason string `json:"reason"` // "idle" | "go_away" | "client" | "error" | "shutdown"
}

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager tracks active Voice Agent sessions.

func NewSessionManager

func NewSessionManager(opts Options) (*SessionManager, error)

NewSessionManager constructs a manager with opts. Unset fields receive sensible defaults.

func (*SessionManager) Attach

func (m *SessionManager) Attach(id string) error

Attach moves a session from pending to active. Returns an error when the session is already attached to another WS client or has been closed.

func (*SessionManager) Create

func (m *SessionManager) Create(owner Identity) (*ManagedSession, string, error)

Create registers a new pending session for the given caller and returns the session record plus a one-time WS upgrade ticket. Fails with a limit error when concurrency caps are exceeded.

func (*SessionManager) Get

func (m *SessionManager) Get(id string) (*ManagedSession, error)

Get returns the session with the given ID, or ErrSessionNotFound.

func (*SessionManager) List

func (m *SessionManager) List(userID string) []*ManagedSession

List returns a snapshot of sessions owned by the given user (or all when userID is empty — reserved for admin callers).

func (*SessionManager) Remove

func (m *SessionManager) Remove(id string)

Remove closes and removes the session. Safe to call multiple times.

func (*SessionManager) VerifyTicket

func (m *SessionManager) VerifyTicket(sessionID, ticket string) error

VerifyTicket validates a ticket string against its expected session ID. Returns nil when the ticket is cryptographically valid, un-expired, and bound to this sessionID. Mutates nothing; callers must follow up with Attach to mark the session active.

type StartFrame

type StartFrame struct {
	Type       string `json:"type"` // must be "start"
	PersonaID  string `json:"persona_id,omitempty"`
	RoleID     string `json:"role_id,omitempty"`
	SequenceID string `json:"sequence_id,omitempty"`
	Voice      string `json:"voice,omitempty"`
	Locale     string `json:"locale,omitempty"`
	Model      string `json:"model,omitempty"`
	Thinking   string `json:"thinking,omitempty"` // "off" | "low" | "medium" | "high"
	// Raw activity-detection policy override. Pipeline translates these to
	// the kernel's internal enums.
	ActivityDetection *ActivityDetectionFrame `json:"activity_detection,omitempty"`
	// Optional durable instruction layered on top of the role's prompt.
	SystemPromptOverride string `json:"system_prompt_override,omitempty"`
}

StartFrame is the mandatory first client frame. Fields marked optional fall back to persona/role defaults when omitted.

type State

type State string

State captures the session's manager-level lifecycle. The Framework kernel (internal/voiceagent.Session) has its own finer-grained state; this one only distinguishes pending ticket vs. active connection.

const (
	StatePendingWS State = "pending_ws"
	StateActive    State = "active"
	StateClosed    State = "closed"
)

type StateFrame

type StateFrame struct {
	Type  string `json:"type"` // "state"
	State string `json:"state"`
}

type TextFrame

type TextFrame struct {
	Type string `json:"type"` // "text"
	Text string `json:"text"`
}

TextFrame carries an injected text turn from the client.

type ToolCallFrame

type ToolCallFrame struct {
	Type string         `json:"type"` // "tool_call"
	ID   string         `json:"id"`
	Name string         `json:"name"`
	Args map[string]any `json:"args,omitempty"`
}

type ToolResponseFrame

type ToolResponseFrame struct {
	Type     string         `json:"type"` // "tool_response"
	ID       string         `json:"id"`
	Name     string         `json:"name"`
	Response map[string]any `json:"response"`
}

ToolResponseFrame resolves a tool call issued by the server.

type TranscriptFrame

type TranscriptFrame struct {
	Type string `json:"type"` // "input_transcript" | "output_transcript"
	Text string `json:"text"`
	Done bool   `json:"done"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL