processors

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

README

Processors

Package processors provides the frame processor abstraction and built-in processors: voice pipeline (VAD/turn, STT, LLM, TTS), echo, filters, aggregators, audio, and frameworks (RTVI, external chain).

Purpose

  • Processor: Interface for pipeline nodes; ProcessFrame(ctx, f, dir) with Direction (Downstream/Upstream); SetNext/SetPrev, Setup/Cleanup, Name.
  • BaseProcessor: Default linking and forward behavior; embed and override ProcessFrame (and optionally Setup/Cleanup).
  • FilterProcessor: Forwards frames only when FilterFunc returns true.
  • AIServiceBase: Base for AI stages; handles StartFrame/EndFrame/CancelFrame via Start/Stop/Cancel; ServiceSettings (model, voice).
  • FrameQueue / QueuedProcessor: Priority queue (system frames first) and processor that drains the queue in a goroutine.
  • Subpackages: voice (full STT→LLM→TTS pipeline), echo, filters (identity, wake, stt_mute, etc.), aggregator, aggregators (llmtext, gated, dtmf, etc.), audio (VAD, buffer, filter), frameworks (rtvi, external chain).

Voice pipeline flow

flowchart LR
    In["Audio input"] --> Turn["voice.Turn\nVAD + silence"]
    Turn -->|"user segment"| STT["voice.STT"]
    STT -->|"TranscriptionFrame"| LLM["voice.LLM"]
    LLM -->|"LLMTextFrame"| Boundary["Sentence boundary"]
    Boundary -->|"sentence"| TTS["voice.TTS"]
    TTS -->|"TTSAudioRawFrame"| Out["Output"]

Processor types (root + subpackages)

graph TD
    Processor["Processor interface"] --> Base["BaseProcessor"]
    Processor --> Filter["FilterProcessor"]
    Processor --> Voice["voice\nSTT, LLM, TTS, Turn"]
    Processor --> Echo["echo"]
    Processor --> Agg["aggregator"]
    Base --> AIBase["AIServiceBase"]
    Queue["FrameQueue\nQueuedProcessor"] --> Base
    Voice --> AIBase
    Filters["filters"] --> Filter
    Aggregators["aggregators\nllmtext, gated, dtmf"] --> Base
    Audio["audio\nVAD, buffer, filter"] --> Base
    Frameworks["frameworks\nrtvi, external_chain"] --> Base

Exported symbols (root package)

Symbol Type Description
Processor interface ProcessFrame, SetNext, SetPrev, Setup, Cleanup, Name
Direction type Downstream, Upstream
BaseProcessor struct Linking; NewBaseProcessor, Next, Prev, PushDownstream, PushUpstream; default ProcessFrame forwards to next/prev
FilterFunc type func(ctx, f Frame, dir Direction) bool
FilterProcessor struct Forwards when filter returns true; NewFilterProcessor
ServiceSettings struct Model, Voice
AIServiceBase struct Embeds BaseProcessor; Start/Stop/Cancel on lifecycle frames; NewAIServiceBase, Settings, ApplySettings
QueuedItem struct Frame, Dir
FrameQueue struct System/data queues; NewFrameQueue, Put, Get, Close
QueuedProcessor struct Runs processor from queue in goroutine; NewQueuedProcessor, Run

Concurrency

  • BaseProcessor / FilterProcessor: No internal goroutines; single-threaded along the pipeline chain.
  • FrameQueue: Protected by sync.Mutex; Put/Get safe for concurrent use; Get blocks until item or closed.
  • QueuedProcessor.Run: One goroutine drains the queue and calls ProcessFrame; started by caller.
  • voice: STT/LLM/TTS may call external APIs (blocking or streaming); turn detection and interruption may use internal goroutines (see voice package).

Files (root)

File Description
processor.go Processor, Direction, BaseProcessor
filter.go FilterFunc, FilterProcessor
ai_base.go ServiceSettings, AIServiceBase
queue.go QueuedItem, FrameQueue, QueuedProcessor

Subpackages

Path Description
voice/ Turn (VAD + silence), STT, LLM, TTS; full voice pipeline; register
echo/ Echo processor (for testing)
filters/ identity, null, wake_check, wake_notifier, stt_mute, function_filter; register
aggregator/ Base aggregator
aggregators/ llmtext, gated, gatedcontext, dtmf, llmfullresponse, llmcontextsummarizer, userresponse
audio/ VAD processor, audio buffer, audio filter; register
logger/ Logging processor
frameworks/ RTVI serialize/processor, external_chain; register

See also

Documentation

Overview

Package processors: AIServiceBase provides a base for AI services with settings, Start/Stop/Cancel lifecycle, and optional metrics sync (mirrors upstream ai_service.py).

Package processors provides FilterProcessor for conditional frame forwarding.

Package processors provides the frame processor abstraction and built-in processors.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AIServiceBase

type AIServiceBase struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

AIServiceBase embeds BaseProcessor and adds settings plus Start/Stop/Cancel lifecycle. ProcessFrame handles StartFrame, EndFrame, and CancelFrame by calling Start, Stop, Cancel then forwarding. Subtypes override Start, Stop, Cancel for initialization/cleanup.

func NewAIServiceBase

func NewAIServiceBase(name string, settings *ServiceSettings) *AIServiceBase

NewAIServiceBase returns a base with the given name and optional initial settings. If settings is nil, a zero-value ServiceSettings is used.

func (*AIServiceBase) ApplySettings

func (b *AIServiceBase) ApplySettings(s ServiceSettings)

ApplySettings updates the service settings. Subtypes may override to react to changes.

func (*AIServiceBase) Cancel

func (b *AIServiceBase) Cancel(ctx context.Context, frame *frames.CancelFrame)

Cancel is called when a CancelFrame is processed. Override for cancellation logic.

func (*AIServiceBase) ProcessFrame

func (b *AIServiceBase) ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error

ProcessFrame handles Start/End/Cancel by calling Start, Stop, Cancel (with recovery) then forwards the frame.

func (*AIServiceBase) Settings

func (b *AIServiceBase) Settings() ServiceSettings

Settings returns a copy of the current settings (caller may modify the copy).

func (*AIServiceBase) Start

func (b *AIServiceBase) Start(ctx context.Context, frame *frames.StartFrame)

Start is called when a StartFrame is processed. Override for service-specific initialization.

func (*AIServiceBase) Stop

func (b *AIServiceBase) Stop(ctx context.Context, frame *frames.EndFrame)

Stop is called when an EndFrame is processed. Override for cleanup.

type BaseProcessor

type BaseProcessor struct {
	// contains filtered or unexported fields
}

BaseProcessor provides next/prev linking and default forward behavior.

func NewBaseProcessor

func NewBaseProcessor(name string) *BaseProcessor

NewBaseProcessor returns a BaseProcessor with the given name.

func (*BaseProcessor) Cleanup

func (b *BaseProcessor) Cleanup(ctx context.Context) error

Cleanup is a no-op for BaseProcessor; override in embeddings.

func (*BaseProcessor) Name

func (b *BaseProcessor) Name() string

Name returns the processor name.

func (*BaseProcessor) Next

func (b *BaseProcessor) Next() Processor

Next returns the next processor.

func (*BaseProcessor) Prev

func (b *BaseProcessor) Prev() Processor

Prev returns the previous processor.

func (*BaseProcessor) ProcessFrame

func (b *BaseProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error

ProcessFrame forwards the frame to next (downstream) or prev (upstream). Override in embeddings.

func (*BaseProcessor) PushDownstream

func (b *BaseProcessor) PushDownstream(ctx context.Context, f frames.Frame) error

PushDownstream forwards f to the next processor.

func (*BaseProcessor) PushUpstream

func (b *BaseProcessor) PushUpstream(ctx context.Context, f frames.Frame) error

PushUpstream forwards f to the previous processor.

func (*BaseProcessor) SetNext

func (b *BaseProcessor) SetNext(p Processor)

SetNext sets the next processor in the pipeline.

func (*BaseProcessor) SetPrev

func (b *BaseProcessor) SetPrev(p Processor)

SetPrev sets the previous processor in the pipeline.

func (*BaseProcessor) Setup

func (b *BaseProcessor) Setup(ctx context.Context) error

Setup is a no-op for BaseProcessor; override in embeddings.

type Direction

type Direction int

Direction is the frame flow direction.

const (
	Downstream Direction = 1
	Upstream   Direction = 2
)

type FilterFunc

type FilterFunc func(ctx context.Context, f frames.Frame, dir Direction) bool

FilterFunc returns true if the frame should be forwarded in the given direction.

type FilterProcessor

type FilterProcessor struct {
	*BaseProcessor
	Filter FilterFunc
	// contains filtered or unexported fields
}

FilterProcessor forwards frames only when the filter function returns true. Used by ServiceSwitcher to gate frames so only the active service receives them.

func NewFilterProcessor

func NewFilterProcessor(name string, filter FilterFunc) *FilterProcessor

NewFilterProcessor returns a processor that forwards downstream frames when filter(ctx, f, Downstream) is true, and forwards upstream frames when filter(ctx, f, Upstream) is true. If filter is nil, all frames pass.

func (*FilterProcessor) Name

func (fp *FilterProcessor) Name() string

Name returns the processor name.

func (*FilterProcessor) ProcessFrame

func (fp *FilterProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error

ProcessFrame implements Processor. Forwards the frame only if the filter returns true for this direction.

type FrameQueue

type FrameQueue struct {
	// contains filtered or unexported fields
}

FrameQueue is a queue that prioritizes system frames over data frames.

func NewFrameQueue

func NewFrameQueue() *FrameQueue

NewFrameQueue returns a new FrameQueue.

func (*FrameQueue) Close

func (q *FrameQueue) Close()

Close closes the queue; subsequent Get will return false.

func (*FrameQueue) Get

func (q *FrameQueue) Get(ctx context.Context) (QueuedItem, bool)

Get blocks until an item is available; returns system frames first.

func (*FrameQueue) Put

func (q *FrameQueue) Put(item QueuedItem)

Put adds an item; system frames go to system queue, others to data queue.

type Processor

type Processor interface {
	ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error
	SetNext(p Processor)
	SetPrev(p Processor)
	Setup(ctx context.Context) error
	Cleanup(ctx context.Context) error
	Name() string
}

Processor processes frames and can be linked into a pipeline.

type QueuedItem

type QueuedItem struct {
	Frame frames.Frame
	Dir   Direction
}

QueuedItem holds a frame, direction, and optional callback.

type QueuedProcessor

type QueuedProcessor struct {
	*BaseProcessor
	Queue *FrameQueue
}

QueuedProcessor runs a processor with an input queue and a single goroutine.

func NewQueuedProcessor

func NewQueuedProcessor(base *BaseProcessor, queue *FrameQueue) *QueuedProcessor

NewQueuedProcessor wraps a processor and runs ProcessFrame in a goroutine from the queue.

func (*QueuedProcessor) Run

func (q *QueuedProcessor) Run(ctx context.Context)

Run processes items from the queue until context is cancelled or queue is closed.

type ServiceSettings

type ServiceSettings struct {
	Model string `json:"model,omitempty"`
	Voice string `json:"voice,omitempty"`
}

ServiceSettings holds runtime settings for an AI service (model, voice, etc.).

Directories

Path Synopsis
Package aggregator provides a processor that collects text frames and emits a single aggregated frame (e.g.
Package aggregator provides a processor that collects text frames and emits a single aggregated frame (e.g.
aggregators
dtmf
Package dtmf provides a DTMF aggregator that accumulates InputDTMFFrame digits and emits TranscriptionFrame on timeout, termination digit (#), or EndFrame/CancelFrame.
Package dtmf provides a DTMF aggregator that accumulates InputDTMFFrame digits and emits TranscriptionFrame on timeout, termination digit (#), or EndFrame/CancelFrame.
gated
Package gated provides a gated aggregator that buffers frames when the gate is closed and releases them when the gate opens (custom open/close predicates).
Package gated provides a gated aggregator that buffers frames when the gate is closed and releases them when the gate opens (custom open/close predicates).
gatedcontext
Package gatedcontext provides a processor that holds LLMContextFrame until a notifier signals release.
Package gatedcontext provides a processor that holds LLMContextFrame until a notifier signals release.
llmcontextsummarizer
Package llmcontextsummarizer provides a processor that monitors LLM context size and emits LLMContextSummaryRequestFrame when thresholds are exceeded; applies results from LLMContextSummaryResultFrame.
Package llmcontextsummarizer provides a processor that monitors LLM context size and emits LLMContextSummaryRequestFrame when thresholds are exceeded; applies results from LLMContextSummaryResultFrame.
llmfullresponse
Package llmfullresponse provides a processor that aggregates LLM text between LLMFullResponseStartFrame and LLMFullResponseEndFrame and invokes a callback on completion or interruption.
Package llmfullresponse provides a processor that aggregates LLM text between LLMFullResponseStartFrame and LLMFullResponseEndFrame and invokes a callback on completion or interruption.
llmtext
Package llmtext provides a processor that converts LLMTextFrame to AggregatedTextFrame using a configurable text aggregator (e.g.
Package llmtext provides a processor that converts LLMTextFrame to AggregatedTextFrame using a configurable text aggregator (e.g.
userresponse
Package userresponse provides a processor that aggregates TranscriptionFrame into a single TextFrame when the user turn ends (e.g.
Package userresponse provides a processor that aggregates TranscriptionFrame into a single TextFrame when the user turn ends (e.g.
Package audio provides audio processors for the pipeline: VAD (voice activity detection) and an audio buffer processor that merges user and bot audio with optional turn-based and buffered callbacks.
Package audio provides audio processors for the pipeline: VAD (voice activity detection) and an audio buffer processor that merges user and bot audio with optional turn-based and buffered callbacks.
Package filters provides frame-filtering processors for the pipeline, ported from upstream processors/filters: frame_filter, function_filter, identity_filter, null_filter, stt_mute_filter, wake_check_filter, wake_notifier_filter.
Package filters provides frame-filtering processors for the pipeline, ported from upstream processors/filters: frame_filter, function_filter, identity_filter, null_filter, stt_mute_filter, wake_check_filter, wake_notifier_filter.
Package frameworks provides processor integrations for external runtimes and the RTVI protocol, ported from upstream processors/frameworks.
Package frameworks provides processor integrations for external runtimes and the RTVI protocol, ported from upstream processors/frameworks.
rtvi
Package rtvi implements the RTVI (Real-Time Voice Interface) protocol processor and message types.
Package rtvi implements the RTVI (Real-Time Voice Interface) protocol processor and message types.
Package voice provides processors that wire STT, LLM, and TTS into a pipeline.
Package voice provides processors that wire STT, LLM, and TTS into a pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL