Documentation
¶
Index ¶
- Constants
- Variables
- func CastOption[T any](options map[string]any) (val T)
- func ComputeSampleByteCount(sampleRate, bitDepth, channels int) int
- func IsAudioCodecSupported(codec AudioCodec) bool
- func IsVideoCodecSupported(codec VideoCodec) bool
- func NewDownsamplingLowPass(sourceRate, targetRate int) *lowPassFIR
- func NewUpsamplingAntiImagingLowPass(sourceRate, targetRate int) *lowPassFIR
- func NormalizeFramePeriod(s string) time.Duration
- func RMSPCM16LE(pcm []byte) float64
- func ResamplePCM(data []byte, inputRate, outputRate int) ([]byte, error)
- func SetDefaultResampler(factory ConverterFactory)
- type AsyncTaskRunner
- func (tr *AsyncTaskRunner[T]) CancelActiveTask()
- func (tr *AsyncTaskRunner[T]) HandleMediaData(h MediaHandler, data MediaData)
- func (tr *AsyncTaskRunner[T]) HandlePacket(h MediaHandler, packet MediaPacket)
- func (tr *AsyncTaskRunner[T]) HandleState(h MediaHandler, event StateChange)
- func (tr *AsyncTaskRunner[T]) ReleaseResources()
- type AudioCodec
- type AudioPacket
- type AudioProfile
- type BaseProcessor
- type ClosePacket
- type CodecConfig
- type CodecInfo
- type CodecLevel
- type CompletedData
- type ConverterFactory
- type DCBlockHPF
- type DTMFPacket
- type EncoderFunc
- type ErrorHandler
- type EventBus
- func (eb *EventBus) Close()
- func (eb *EventBus) Publish(event *MediaEvent)
- func (eb *EventBus) PublishError(sessionID string, err error, sender interface{})
- func (eb *EventBus) PublishPacket(sessionID string, packet MediaPacket, sender interface{})
- func (eb *EventBus) PublishState(sessionID string, state StateChange, sender interface{})
- func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler)
- func (eb *EventBus) Unsubscribe(eventType EventType, handler EventHandler)
- type EventHandler
- type EventType
- type FuncProcessor
- type InterpolatingConverter
- type LocalMediaCache
- type MediaData
- type MediaEvent
- type MediaHandler
- type MediaHandlerFunc
- type MediaPacket
- type MediaSession
- func (s *MediaSession) AddInputTransport(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) AddMetric(key string, duration time.Duration)
- func (s *MediaSession) AddOutputTransport(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) CauseError(sender any, err error)
- func (s *MediaSession) Close() error
- func (s *MediaSession) Codec() CodecConfig
- func (s *MediaSession) Context(parent context.Context) *MediaSession
- func (s *MediaSession) Decode(dec EncoderFunc) *MediaSession
- func (s *MediaSession) Delete(key string)
- func (s *MediaSession) DrainOutputs() int
- func (s *MediaSession) EmitPacket(sender any, packet MediaPacket)
- func (s *MediaSession) EmitState(sender any, state string, params ...any)
- func (s *MediaSession) Encode(enc EncoderFunc) *MediaSession
- func (s *MediaSession) Error(handles ...ErrorHandler) *MediaSession
- func (s *MediaSession) Get(key string) (val any, ok bool)
- func (s *MediaSession) GetContext() context.Context
- func (s *MediaSession) GetSession() *MediaSession
- func (s *MediaSession) GetString(key string) string
- func (s *MediaSession) GetUint(key string) uint
- func (s *MediaSession) InjectPacket(f PacketFilter)
- func (s *MediaSession) Input(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) IsValid() error
- func (s *MediaSession) NotifyServeStarting()
- func (s *MediaSession) On(state string, handles ...StateChangeHandler) *MediaSession
- func (s *MediaSession) Output(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) Pipeline(handles ...MediaHandlerFunc) *MediaSession
- func (s *MediaSession) PostHook(hooks ...SessionHook) *MediaSession
- func (s *MediaSession) RegisterProcessor(processor Processor) *MediaSession
- func (s *MediaSession) SendToOutput(sender any, packet MediaPacket)
- func (s *MediaSession) Serve() error
- func (s *MediaSession) Set(key string, val any)
- func (s *MediaSession) SetSessionID(id string) *MediaSession
- func (s *MediaSession) String() string
- func (s *MediaSession) Trace(trace MediaHandlerFunc) *MediaSession
- func (s *MediaSession) UseMiddleware(handles ...MediaHandlerFunc) *MediaSession
- func (s *MediaSession) WaitServeShutdown(ctx context.Context) error
- func (s *MediaSession) WithQueueSize(n int) *MediaSession
- type MediaTransport
- type PacketFilter
- type PacketProcessor
- type PacketRequest
- type PipelineStage
- func (sp *PipelineStage) AddMetric(key string, duration time.Duration)
- func (sp *PipelineStage) CauseError(sender any, err error)
- func (sp *PipelineStage) EmitPacket(sender any, packet MediaPacket)
- func (sp *PipelineStage) EmitState(sender any, state string, params ...any)
- func (sp *PipelineStage) GetContext() context.Context
- func (sp *PipelineStage) GetSession() *MediaSession
- func (sp *PipelineStage) InjectPacket(f PacketFilter)
- func (sp *PipelineStage) SendToOutput(sender any, packet MediaPacket)
- func (sp *PipelineStage) String() string
- type Processor
- type ProcessorCondition
- type ProcessorPriority
- type ProcessorRegistry
- type RouteRule
- type Router
- type RoutingStrategy
- type SampleRateConverter
- type SessionHook
- type StateChange
- type StateChangeHandler
- type StreamFormat
- type TextPacket
- type TranscribingData
- type TransportConnector
- type TransportManager
- type TurnDetectionData
- type VideoCodec
- type VideoProfile
Constants ¶
const ( DirectionInput = "rx" DirectionOutput = "tx" )
Constants
const ( // KeySIPSuppressUplinkEcho: when true, output-router does not send decoded uplink // microphone AudioPackets to RTP output; only synthesized (TTS) audio is sent. KeySIPSuppressUplinkEcho = "sip_suppress_uplink_echo" )
Session value keys (sync.Map on MediaSession)
Variables ¶
var ( AllStates = "*" Begin = "begin" End = "end" Hangup = "hangup" StartSpeaking = "speaking.start" StartSilence = "silence.start" Transcribing = "transcribing" // params: sentence string Synthesizing = "synthesizing" // params: result string StartPlay = "play.start" StopPlay = "play.stop" Completed = "completed" // interrupt Interruption = "interruption" )
State types
var ( MediaDataTypeState = "state" MediaDataTypePacket = "packet" MediaDataTypeMetric = "metric" )
var ( ErrNotInputTransport = errors.New("not input transport") ErrNotOutputTransport = errors.New("not output transport") ErrCodecNotSupported = errors.New("codec not supported") )
var ( AgentRunning = "_agent_running" WorkingState = "_working_state" UpstreamRunning = "_upstream_running" )
Functions ¶
func CastOption ¶
func ComputeSampleByteCount ¶
ComputeSampleByteCount returns the number of bytes produced by one millisecond of linear-PCM audio at the given configuration. It is used by both the recognizer (to compute byte budgets per second) and the synthesizer (to size frame slices).
Formula: bytes_per_ms = sampleRate * (bitDepth/8) * channels / 1000.
Inputs <= 0 are treated as zero so the function never panics; callers can guard against zero results to detect misconfiguration.
func IsAudioCodecSupported ¶
func IsAudioCodecSupported(codec AudioCodec) bool
IsAudioCodecSupported checks if an audio codec is supported
func IsVideoCodecSupported ¶
func IsVideoCodecSupported(codec VideoCodec) bool
IsVideoCodecSupported checks if a video codec is supported
func NewDownsamplingLowPass ¶ added in v1.4.0
func NewDownsamplingLowPass(sourceRate, targetRate int) *lowPassFIR
NewDownsamplingLowPass returns a filter whose passband ends at roughly 0.45 × targetRate (normalised to sourceRate). Returns nil when no filtering is needed (sourceRate <= targetRate, i.e. upsampling or no-op — there's no aliasing to worry about there).
func NewUpsamplingAntiImagingLowPass ¶ added in v1.4.0
func NewUpsamplingAntiImagingLowPass(sourceRate, targetRate int) *lowPassFIR
NewUpsamplingAntiImagingLowPass returns the *post*-interpolation FIR that suppresses spectral images produced by upsampling.
Math note: when you upsample from sourceRate to targetRate (Y > X), linear / cubic interpolation is mathematically equivalent to zero-stuffing followed by a (mediocre) low-pass. Anything above the ORIGINAL Nyquist (X/2) in the new spectrum is an alias-image of the source band; a proper LPF at cutoff = X/2 (in the target rate) cleans them up.
Practical note: in our common path (8 kHz PSTN → 16 kHz AI), the source is already band-limited to ~3.4 kHz by the telephony network, so this filter is mostly a no-op. We still apply it for correctness and to handle the 16 kHz → 48 kHz Opus path where images are real.
Returns nil for downsampling / no-op cases.
func NormalizeFramePeriod ¶
NormalizeFramePeriod parses a frame-period string (e.g. "20ms", "60ms") and clamps it to the supported range [10ms, 300ms]. Empty / unparseable / zero / out-of-range values are coerced to a safe default of 20ms so downstream RTP/codec packetizers always receive a usable cadence.
The string form is what synthesizer/recognizer Options carry over the wire (JSON / form encoding). Pure-Duration callers can simply pass d.String().
func RMSPCM16LE ¶ added in v1.3.0
RMSPCM16LE returns the RMS (Root Mean Square) level of signed 16-bit little-endian PCM samples. RMS is a measure of the audio signal's energy or loudness.
The function calculates: sqrt(sum(sample^2) / count)
Returns 0 when the buffer is too short to contain a full sample (less than 2 bytes). This is useful for:
- Voice Activity Detection (VAD): determining if audio contains speech
- Gain Control: adjusting audio levels automatically
- Audio Analysis: measuring signal strength
Example:
pcm := []byte{0x00, 0x10, 0x00, 0x10} // two 16-bit samples
rms := RMSPCM16LE(pcm) // returns the RMS value
func ResamplePCM ¶
ResamplePCM converts audio data from one sample rate to another
func SetDefaultResampler ¶
func SetDefaultResampler(factory ConverterFactory)
SetDefaultResampler sets the default converter factory
Types ¶
type AsyncTaskRunner ¶
type AsyncTaskRunner[T any] struct { InitCallback func(h MediaHandler) error TerminateCallback func(h MediaHandler) error StateCallback func(h MediaHandler, event StateChange) error RequestBuilder func(h MediaHandler, packet MediaPacket) (*PacketRequest[T], error) TaskExecutor func(ctx context.Context, h MediaHandler, req PacketRequest[T]) error WorkerPoolSize int TaskTimeout time.Duration MaxTaskTimeout time.Duration ConcurrentMode bool // contains filtered or unexported fields }
AsyncTaskRunner handles asynchronous task execution using true multi-worker pool pattern
func NewAsyncTaskRunner ¶
func NewAsyncTaskRunner[T any](queueSize int) AsyncTaskRunner[T]
NewAsyncTaskRunner creates a new task runner with worker pool configuration
func (*AsyncTaskRunner[T]) CancelActiveTask ¶
func (tr *AsyncTaskRunner[T]) CancelActiveTask()
CancelActiveTask stops all worker tasks (for multi-worker pool)
func (*AsyncTaskRunner[T]) HandleMediaData ¶
func (tr *AsyncTaskRunner[T]) HandleMediaData(h MediaHandler, data MediaData)
HandleMediaData routes media data to appropriate handlers
func (*AsyncTaskRunner[T]) HandlePacket ¶
func (tr *AsyncTaskRunner[T]) HandlePacket(h MediaHandler, packet MediaPacket)
HandlePacket processes packet data through task queue
func (*AsyncTaskRunner[T]) HandleState ¶
func (tr *AsyncTaskRunner[T]) HandleState(h MediaHandler, event StateChange)
HandleState processes state change events
func (*AsyncTaskRunner[T]) ReleaseResources ¶
func (tr *AsyncTaskRunner[T]) ReleaseResources()
ReleaseResources frees allocated resources and stops all workers
type AudioCodec ¶
type AudioCodec string
AudioCodec represents supported audio codecs
const ( // Uncompressed AudioCodecPCM AudioCodec = "pcm" AudioCodecPCMU AudioCodec = "pcmu" AudioCodecPCMA AudioCodec = "pcma" // Lossless compression AudioCodecFLAC AudioCodec = "flac" AudioCodecAPE AudioCodec = "ape" AudioCodecWAV AudioCodec = "wav" // Lossy compression (Low bitrate) AudioCodecMP3 AudioCodec = "mp3" AudioCodecAAC AudioCodec = "aac" AudioCodecOPUS AudioCodec = "opus" AudioCodecVORBIS AudioCodec = "vorbis" // Lossy compression (High quality) AudioCodecFDAC AudioCodec = "fdac" AudioCodecALAC AudioCodec = "alac" // Telephony AudioCodecGSM AudioCodec = "gsm" AudioCodecAMR AudioCodec = "amr" AudioCodecSILK AudioCodec = "silk" // Proprietary AudioCodecWMA AudioCodec = "wma" AudioCodecAC3 AudioCodec = "ac3" AudioCodecDTS AudioCodec = "dts" )
type AudioPacket ¶
type AudioPacket struct {
PlayID string `json:"id,omitempty"`
Sequence int `json:"sequence"`
Payload []byte `json:"payload"`
// RTPSamples, when > 0, is the RTP clock increment for this frame (RFC 3550 timestamp delta).
// Encoders with variable frame sizes (e.g. OPUS) should set this so the RTP layer does not guess from duration.
RTPSamples uint32 `json:"rtpSamples,omitempty"`
IsFirstPacket bool `json:"isFirstPacket,omitempty"`
IsEndPacket bool `json:"isEndPacket,omitempty"`
IsSynthesized bool `json:"isSynthesized,omitempty"`
IsSilence bool `json:"isSilence,omitempty"`
SourceText string `json:"sourceText,omitempty"`
}
func (*AudioPacket) Body ¶
func (d *AudioPacket) Body() []byte
func (*AudioPacket) String ¶
func (d *AudioPacket) String() string
type AudioProfile ¶
type AudioProfile string
AudioProfile represents audio codec profiles
const ( // AAC profiles AudioProfileAAC_LC AudioProfile = "aac_lc" AudioProfileAAC_HE AudioProfile = "aac_he" AudioProfileAAC_HE_V2 AudioProfile = "aac_he_v2" AudioProfileAAC_LD AudioProfile = "aac_ld" AudioProfileAAC_ELD AudioProfile = "aac_eld" // Opus profiles AudioProfileOpusNarrow AudioProfile = "opus_narrow" AudioProfileOpusWide AudioProfile = "opus_wide" // MP3 profiles AudioProfileMP3_MPEG1 AudioProfile = "mp3_mpeg1" AudioProfileMP3_MPEG2 AudioProfile = "mp3_mpeg2" AudioProfileMP3_MPEG25 AudioProfile = "mp3_mpeg25" )
type BaseProcessor ¶
type BaseProcessor struct {
// contains filtered or unexported fields
}
BaseProcessor provides default implementation for common processor functionality
func NewBaseProcessor ¶
func NewBaseProcessor(name string, priority ProcessorPriority) *BaseProcessor
NewBaseProcessor creates a base processor
func (*BaseProcessor) CanHandle ¶
func (bp *BaseProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool
CanHandle checks if the processor can handle the event
func (*BaseProcessor) Priority ¶
func (bp *BaseProcessor) Priority() ProcessorPriority
Priority returns the processor priority
func (*BaseProcessor) WithCondition ¶
func (bp *BaseProcessor) WithCondition(condition ProcessorCondition) *BaseProcessor
WithCondition sets a condition for the processor
type ClosePacket ¶
type ClosePacket struct {
Reason string `json:"reason"`
}
func (*ClosePacket) Body ¶
func (f *ClosePacket) Body() []byte
func (*ClosePacket) String ¶
func (f *ClosePacket) String() string
type CodecConfig ¶
type CodecConfig struct {
Codec string `json:"codec" form:"codec" default:"pcm"`
SampleRate int `json:"sampleRate" form:"sample_rate" default:"16000"`
Channels int `json:"channels" form:"channels" default:"1"`
BitDepth int `json:"bitDepth" form:"bit_depth" default:"16"`
FrameDuration string `json:"frameDuration" form:"frame_duration"`
PayloadType uint8 `json:"payloadType" form:"payload_type"`
// OpusDecodeChannels: if 1 or 2, Opus inbound decode uses this many channels (from remote SDP offer),
// then downmixes to mono for PCM. Encoder/answer Channels stay 1.
OpusDecodeChannels int `json:"opusDecodeChannels,omitempty" form:"opus_decode_channels"`
// OpusPCMBridgeDecodeStereo: SIP two-leg PCM bridge only — SDP OPUS/48000/2 payloads use a stereo
// TOC; force a 2-channel libopus decoder then downmix to mono PCM. CallSession/ASR leaves this false.
OpusPCMBridgeDecodeStereo bool `json:"-"`
}
CodecConfig defines codec configuration
func DefaultCodecConfig ¶
func DefaultCodecConfig() CodecConfig
func (CodecConfig) String ¶
func (c CodecConfig) String() string
type CodecInfo ¶
type CodecInfo struct {
// Codec identifier (can be audio or video codec name)
Codec string
// Human-readable name
Name string
// Description
Description string
// Whether the codec is lossy
IsLossy bool
// Supported profiles
Profiles []string
// Supported levels
Levels []string
// Typical bitrate range (kbps)
BitrateMin int
BitrateMax int
// Whether hardware acceleration is available
HardwareAcceleration bool
// Supported container formats
Containers []string
}
CodecInfo contains detailed information about a codec
func AudioCodecInfo ¶
func AudioCodecInfo(codec AudioCodec) *CodecInfo
AudioCodecInfo returns information about an audio codec
func VideoCodecInfo ¶
func VideoCodecInfo(codec VideoCodec) *CodecInfo
VideoCodecInfo returns information about a video codec
type CodecLevel ¶
type CodecLevel string
CodecLevel represents codec level/tier
const ( // H.264 levels CodecLevelH264_1 CodecLevel = "h264_1" CodecLevelH264_1b CodecLevel = "h264_1b" CodecLevelH264_11 CodecLevel = "h264_11" CodecLevelH264_12 CodecLevel = "h264_12" CodecLevelH264_13 CodecLevel = "h264_13" CodecLevelH264_2 CodecLevel = "h264_2" CodecLevelH264_21 CodecLevel = "h264_21" CodecLevelH264_22 CodecLevel = "h264_22" CodecLevelH264_3 CodecLevel = "h264_3" CodecLevelH264_31 CodecLevel = "h264_31" CodecLevelH264_32 CodecLevel = "h264_32" CodecLevelH264_4 CodecLevel = "h264_4" CodecLevelH264_41 CodecLevel = "h264_41" CodecLevelH264_42 CodecLevel = "h264_42" CodecLevelH264_5 CodecLevel = "h264_5" CodecLevelH264_51 CodecLevel = "h264_51" CodecLevelH264_52 CodecLevel = "h264_52" // H.265 levels CodecLevelH265_1 CodecLevel = "h265_1" CodecLevelH265_2 CodecLevel = "h265_2" CodecLevelH265_21 CodecLevel = "h265_21" CodecLevelH265_3 CodecLevel = "h265_3" CodecLevelH265_31 CodecLevel = "h265_31" CodecLevelH265_4 CodecLevel = "h265_4" CodecLevelH265_41 CodecLevel = "h265_41" CodecLevelH265_5 CodecLevel = "h265_5" CodecLevelH265_51 CodecLevel = "h265_51" CodecLevelH265_52 CodecLevel = "h265_52" )
type CompletedData ¶
type CompletedData struct {
SenderName string `json:"senderName"` // eg: tts.aws, asr.qcloud
Duration time.Duration `json:"duration"` // total duration
Source MediaPacket `json:"-"` // last packet
Result any `json:"result"` // result
AssistantId uint `json:"assistantId"`
AssistantVid uint `json:"assistantVid"`
DialogID string `json:"dialogID"`
}
func (*CompletedData) MarshalJSON ¶
func (d *CompletedData) MarshalJSON() ([]byte, error)
func (CompletedData) String ¶
func (d CompletedData) String() string
type ConverterFactory ¶
type ConverterFactory func(inputRate, outputRate int) SampleRateConverter
ConverterFactory creates a sample rate converter
type DCBlockHPF ¶ added in v1.4.0
type DCBlockHPF struct {
// contains filtered or unexported fields
}
DCBlockHPF is a one-pole IIR high-pass that removes DC offset and sub-audible (< ~30 Hz at 8 kHz) rumble that PSTN trunks often inject. NOT related to resampling: this is a separate quality stage that should run on any inbound telephony audio.
Transfer function (RFC-less, classic DSP):
y[n] = x[n] - x[n-1] + R * y[n-1], R ≈ 0.995 at 8 kHz
The pole at z = R places the -3 dB corner at roughly (1-R) * sampleRate / (2π) ≈ 30 Hz with R = 0.995, sr = 8000. That's well below the 300 Hz lower edge of the telephony band so it won't audibly thin the speech.
State (xPrev, yPrev) persists across Process calls so chunk boundaries don't introduce clicks.
func NewDCBlockHPF ¶ added in v1.4.0
func NewDCBlockHPF(sampleRate int) *DCBlockHPF
NewDCBlockHPF builds a DC blocker tuned for the given sample rate. The pole coefficient is computed so the -3 dB corner sits at ~30 Hz regardless of rate (8k / 16k / 48k all OK).
func (*DCBlockHPF) Process ¶ added in v1.4.0
func (h *DCBlockHPF) Process(in []int16) []int16
Process applies the filter in place and returns the same slice for chaining. Saturates to int16 range on overshoot (rare — the HPF can only attenuate, never amplify, so saturation is a numerical-edge concern only).
func (*DCBlockHPF) Reset ¶ added in v1.4.0
func (h *DCBlockHPF) Reset()
Reset clears filter state. Call on dialog reset (re-INVITE, transfer) to avoid carrying the previous leg's offset bias.
type DTMFPacket ¶
type DTMFPacket struct {
Digit string `json:"digit"` // "0"–"9", "*", "#", "A"–"D"
End bool `json:"end"` // RFC 2833 E (end) bit — prefer handling when true to avoid duplicate keys
}
DTMFPacket carries one RFC 2833 / RFC 4733 telephone-event (RTP payload type typically 101). Emitted by SIP RTP input when the peer sends out-of-band DTMF; not passed through audio codecs.
func (*DTMFPacket) Body ¶
func (d *DTMFPacket) Body() []byte
func (*DTMFPacket) String ¶
func (d *DTMFPacket) String() string
type EncoderFunc ¶
type EncoderFunc func(packet MediaPacket) ([]MediaPacket, error)
type ErrorHandler ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages event distribution using pub/sub pattern
func NewEventBus ¶
NewEventBus creates a new event bus
func (*EventBus) Publish ¶
func (eb *EventBus) Publish(event *MediaEvent)
Publish sends an event to all subscribers
func (*EventBus) PublishError ¶
PublishError publishes an error event
func (*EventBus) PublishPacket ¶
func (eb *EventBus) PublishPacket(sessionID string, packet MediaPacket, sender interface{})
PublishPacket publishes a packet event
func (*EventBus) PublishState ¶
func (eb *EventBus) PublishState(sessionID string, state StateChange, sender interface{})
PublishState publishes a state change event
func (*EventBus) Subscribe ¶
func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler)
Subscribe registers an event handler for a specific event type
func (*EventBus) Unsubscribe ¶
func (eb *EventBus) Unsubscribe(eventType EventType, handler EventHandler)
Unsubscribe removes an event handler
type EventHandler ¶
type EventHandler func(ctx context.Context, event *MediaEvent) error
EventHandler processes events from the event bus
type FuncProcessor ¶
type FuncProcessor struct {
*BaseProcessor
// contains filtered or unexported fields
}
FuncProcessor is a processor implemented as a function
func NewFuncProcessor ¶
func NewFuncProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, event *MediaEvent) error) *FuncProcessor
NewFuncProcessor creates a function-based processor
func (*FuncProcessor) Process ¶
func (fp *FuncProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
Process executes the processor function
type InterpolatingConverter ¶
type InterpolatingConverter struct {
// contains filtered or unexported fields
}
InterpolatingConverter performs optimized interpolation for sample rate conversion
func (*InterpolatingConverter) Close ¶
func (ic *InterpolatingConverter) Close() error
func (*InterpolatingConverter) ConvertSamples ¶
func (ic *InterpolatingConverter) ConvertSamples(samples []byte) []byte
ConvertSamples performs interpolation (linear by default, cubic if enabled)
func (*InterpolatingConverter) Samples ¶
func (ic *InterpolatingConverter) Samples() []byte
type LocalMediaCache ¶
func MediaCache ¶
func MediaCache() *LocalMediaCache
func (*LocalMediaCache) BuildKey ¶
func (c *LocalMediaCache) BuildKey(params ...string) string
type MediaData ¶
type MediaData struct {
CreatedAt time.Time
Sender any
Type string
State StateChange
Packet MediaPacket
Duration *time.Duration
}
type MediaEvent ¶
type MediaEvent struct {
Type EventType
Timestamp time.Time
SessionID string
Payload interface{}
Metadata map[string]interface{}
}
MediaEvent represents an event in the event bus
type MediaHandler ¶
type MediaHandler interface {
GetContext() context.Context
GetSession() *MediaSession
CauseError(sender any, err error)
EmitState(sender any, state string, params ...any)
EmitPacket(sender any, packet MediaPacket)
SendToOutput(sender any, packet MediaPacket)
AddMetric(key string, duration time.Duration)
InjectPacket(f PacketFilter)
}
type MediaHandlerFunc ¶
type MediaHandlerFunc func(h MediaHandler, data MediaData)
type MediaPacket ¶
MediaPacket types - represents media data packets
type MediaSession ¶
type MediaSession struct {
ID string `json:"id"`
Running bool `json:"running"`
QueueSize int `json:"queueSize"`
SampleRate int // sample rate of the session
MaxSessionDuration int `json:"maxSessionDuration"` // Set the maximum session duration in seconds.
EffectAudios map[string]*[]byte `json:"-"`
StartAt time.Time `json:"startAt"`
// contains filtered or unexported fields
}
func NewDefaultSession ¶
func NewDefaultSession() *MediaSession
func (*MediaSession) AddInputTransport ¶
func (s *MediaSession) AddInputTransport(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
AddInputTransport registers input transport with different method name
func (*MediaSession) AddMetric ¶
func (s *MediaSession) AddMetric(key string, duration time.Duration)
func (*MediaSession) AddOutputTransport ¶
func (s *MediaSession) AddOutputTransport(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
AddOutputTransport registers output transport with different method name
func (*MediaSession) CauseError ¶
func (s *MediaSession) CauseError(sender any, err error)
func (*MediaSession) Close ¶
func (s *MediaSession) Close() error
func (*MediaSession) Codec ¶
func (s *MediaSession) Codec() CodecConfig
func (*MediaSession) Context ¶
func (s *MediaSession) Context(parent context.Context) *MediaSession
chainable methods
func (*MediaSession) Decode ¶
func (s *MediaSession) Decode(dec EncoderFunc) *MediaSession
func (*MediaSession) Delete ¶
func (s *MediaSession) Delete(key string)
func (*MediaSession) DrainOutputs ¶ added in v1.4.1
func (s *MediaSession) DrainOutputs() int
DrainOutputs non-blockingly drops every pending packet from each output transport's send queue. Used for barge-in: when the caller starts speaking, queued AI audio must be cleared immediately. Safe from any goroutine.
func (*MediaSession) EmitPacket ¶
func (s *MediaSession) EmitPacket(sender any, packet MediaPacket)
func (*MediaSession) EmitState ¶
func (s *MediaSession) EmitState(sender any, state string, params ...any)
func (*MediaSession) Encode ¶
func (s *MediaSession) Encode(enc EncoderFunc) *MediaSession
func (*MediaSession) Error ¶
func (s *MediaSession) Error(handles ...ErrorHandler) *MediaSession
Handle error caused
func (*MediaSession) GetContext ¶
func (s *MediaSession) GetContext() context.Context
func (*MediaSession) GetSession ¶
func (s *MediaSession) GetSession() *MediaSession
func (*MediaSession) GetString ¶
func (s *MediaSession) GetString(key string) string
func (*MediaSession) GetUint ¶
func (s *MediaSession) GetUint(key string) uint
func (*MediaSession) Input ¶
func (s *MediaSession) Input(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
Input is an alias for backward compatibility
func (*MediaSession) IsValid ¶
func (s *MediaSession) IsValid() error
func (*MediaSession) NotifyServeStarting ¶
func (s *MediaSession) NotifyServeStarting()
NotifyServeStarting records that Serve() is about to run. Call synchronously before starting the Serve goroutine so WaitServeShutdown can block until that Serve instance has torn down.
func (*MediaSession) On ¶
func (s *MediaSession) On(state string, handles ...StateChangeHandler) *MediaSession
func (*MediaSession) Output ¶
func (s *MediaSession) Output(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
Output is an alias for backward compatibility
func (*MediaSession) Pipeline ¶
func (s *MediaSession) Pipeline(handles ...MediaHandlerFunc) *MediaSession
Pipeline is an alias for backward compatibility
func (*MediaSession) PostHook ¶
func (s *MediaSession) PostHook(hooks ...SessionHook) *MediaSession
func (*MediaSession) RegisterProcessor ¶
func (s *MediaSession) RegisterProcessor(processor Processor) *MediaSession
RegisterProcessor registers a processor in the registry
func (*MediaSession) SendToOutput ¶
func (s *MediaSession) SendToOutput(sender any, packet MediaPacket)
func (*MediaSession) Serve ¶
func (s *MediaSession) Serve() error
Serve Start the session, this will block the current goroutine
func (*MediaSession) Set ¶
func (s *MediaSession) Set(key string, val any)
func (*MediaSession) SetSessionID ¶
func (s *MediaSession) SetSessionID(id string) *MediaSession
func (*MediaSession) String ¶
func (s *MediaSession) String() string
func (*MediaSession) Trace ¶
func (s *MediaSession) Trace(trace MediaHandlerFunc) *MediaSession
func (*MediaSession) UseMiddleware ¶
func (s *MediaSession) UseMiddleware(handles ...MediaHandlerFunc) *MediaSession
UseMiddleware is deprecated, use RegisterProcessor instead
func (*MediaSession) WaitServeShutdown ¶
func (s *MediaSession) WaitServeShutdown(ctx context.Context) error
WaitServeShutdown blocks until Serve has finished and released transport readers/writers, or until ctx ends. If Serve was never scheduled for this session, returns nil immediately.
func (*MediaSession) WithQueueSize ¶ added in v1.4.1
func (s *MediaSession) WithQueueSize(n int) *MediaSession
WithQueueSize sets TX/output queue depth and rebuilds the per-session EventBus before Serve(). Call this after NewDefaultSession when a larger buffer is needed; otherwise the bus stays at the default QueueSize (256) and may drop packets under heavy TTS/realtime output load.
type MediaTransport ¶
type MediaTransport interface {
io.Closer
String() string
Attach(s *MediaSession)
Next(ctx context.Context) (MediaPacket, error)
Send(ctx context.Context, packet MediaPacket) (int, error)
Codec() CodecConfig
Close() error
}
MediaTransport interface for media transport
type PacketFilter ¶
type PacketFilter func(packet MediaPacket) (bool, error)
type PacketProcessor ¶
type PacketProcessor struct {
*BaseProcessor
// contains filtered or unexported fields
}
PacketProcessor is a specialized processor for packet events
func NewPacketProcessor ¶
func NewPacketProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, packet MediaPacket) error) *PacketProcessor
NewPacketProcessor creates a packet processor
func (*PacketProcessor) CanHandle ¶
func (pp *PacketProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool
CanHandle checks if this is a packet event
func (*PacketProcessor) Process ¶
func (pp *PacketProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
Process handles packet events
type PacketRequest ¶
type PacketRequest[R any] struct { H MediaHandler Interrupt bool Req R }
PacketRequest is an alias for backward compatibility - maps Interrupt to shouldStop
type PipelineStage ¶
type PipelineStage struct {
// contains filtered or unexported fields
}
PipelineStage represents a single stage in the processing pipeline Uses asynchronous event-driven architecture instead of synchronous chain
func (*PipelineStage) AddMetric ¶
func (sp *PipelineStage) AddMetric(key string, duration time.Duration)
func (*PipelineStage) CauseError ¶
func (sp *PipelineStage) CauseError(sender any, err error)
func (*PipelineStage) EmitPacket ¶
func (sp *PipelineStage) EmitPacket(sender any, packet MediaPacket)
EmitPacket enqueues packet for asynchronous processing
func (*PipelineStage) EmitState ¶
func (sp *PipelineStage) EmitState(sender any, state string, params ...any)
func (*PipelineStage) GetContext ¶
func (sp *PipelineStage) GetContext() context.Context
func (*PipelineStage) GetSession ¶
func (sp *PipelineStage) GetSession() *MediaSession
func (*PipelineStage) InjectPacket ¶
func (sp *PipelineStage) InjectPacket(f PacketFilter)
InjectPacket sets pre-processing filter function
func (*PipelineStage) SendToOutput ¶
func (sp *PipelineStage) SendToOutput(sender any, packet MediaPacket)
func (*PipelineStage) String ¶
func (sp *PipelineStage) String() string
type Processor ¶
type Processor interface {
// Name returns the processor name
Name() string
// Priority returns processing priority (higher = processed first)
Priority() ProcessorPriority
// CanHandle checks if this processor can handle the event
CanHandle(ctx context.Context, event *MediaEvent) bool
// Process handles the event
Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
}
Processor handles media events
type ProcessorCondition ¶
type ProcessorCondition func(ctx context.Context, event *MediaEvent) bool
ProcessorCondition determines if a processor should handle an event
type ProcessorPriority ¶
type ProcessorPriority int
ProcessorPriority defines processing priority
const ( PriorityLow ProcessorPriority = 0 PriorityNormal ProcessorPriority = 50 PriorityHigh ProcessorPriority = 100 )
type ProcessorRegistry ¶
type ProcessorRegistry struct {
// contains filtered or unexported fields
}
ProcessorRegistry manages registered processors
func NewProcessorRegistry ¶
func NewProcessorRegistry() *ProcessorRegistry
NewProcessorRegistry creates a new processor registry
func (*ProcessorRegistry) GetAllProcessors ¶
func (pr *ProcessorRegistry) GetAllProcessors() []Processor
GetAllProcessors returns all registered processors
func (*ProcessorRegistry) GetProcessors ¶
func (pr *ProcessorRegistry) GetProcessors(ctx context.Context, event *MediaEvent) []Processor
GetProcessors returns all processors that can handle the event, in priority order
func (*ProcessorRegistry) Register ¶
func (pr *ProcessorRegistry) Register(processor Processor)
Register adds a processor to the registry
func (*ProcessorRegistry) Unregister ¶
func (pr *ProcessorRegistry) Unregister(name string)
Unregister removes a processor
type RouteRule ¶
type RouteRule struct {
Condition func(packet MediaPacket) bool
Targets []string // Transport IDs
Strategy RoutingStrategy
}
RouteRule defines routing rules
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router manages packet routing
func NewRouter ¶
func NewRouter(defaultStrategy RoutingStrategy) *Router
NewRouter creates a new router
func (*Router) Route ¶
func (r *Router) Route(packet MediaPacket, availableTransports []*TransportConnector) []*TransportConnector
Route determines where to send a packet
type RoutingStrategy ¶
type RoutingStrategy int
RoutingStrategy defines how packets are routed
const ( // StrategyBroadcast sends to all outputs StrategyBroadcast RoutingStrategy = iota // StrategyRoundRobin distributes across outputs StrategyRoundRobin // StrategyFirstAvailable uses first available output StrategyFirstAvailable )
type SampleRateConverter ¶
type SampleRateConverter interface {
io.WriteCloser
Samples() []byte
}
SampleRateConverter defines interface for converting sample rates
func DefaultResampler ¶
func DefaultResampler(inputRate, outputRate int) SampleRateConverter
DefaultResampler creates the standard PCM16 rate converter (cubic interpolation).
func NewCubicInterpolatingConverter ¶
func NewCubicInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter
NewCubicInterpolatingConverter creates a converter with cubic interpolation (better quality)
func NewInterpolatingConverter ¶
func NewInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter
NewInterpolatingConverter creates a new interpolating converter with linear interpolation (fast)
type SessionHook ¶
type SessionHook func(session *MediaSession)
type StateChange ¶
func (*StateChange) SafeGetStr ¶
func (s *StateChange) SafeGetStr(idx int) string
type StateChangeHandler ¶
type StateChangeHandler func(event StateChange)
type StreamFormat ¶
type TextPacket ¶
type TextPacket struct {
PlayID string `json:"id,omitempty"`
Text string `json:"text"`
IsTranscribed bool `json:"isTranscribed"`
IsLLMGenerated bool `json:"isLLMGenerated"`
IsPartial bool `json:"isPartial"`
IsEnd bool `json:"isEnd"`
Sequence int `json:"sequence"`
StartAt time.Time `json:"startAt"`
}
func (*TextPacket) Body ¶
func (t *TextPacket) Body() []byte
func (*TextPacket) String ¶
func (t *TextPacket) String() string
type TranscribingData ¶
type TranscribingData struct {
SenderName string `json:"senderName"` // eg: tts.aws, asr.qcloud
Duration time.Duration `json:"duration"` // total duration
Source MediaPacket `json:"-"` // last packet
Result any `json:"result"` // result
Direction string `json:"direction"` // direction
DialogID string `json:"dialogID"`
}
func (*TranscribingData) MarshalJSON ¶
func (d *TranscribingData) MarshalJSON() ([]byte, error)
func (TranscribingData) String ¶
func (d TranscribingData) String() string
type TransportConnector ¶
type TransportConnector struct {
ID string
Transport MediaTransport
Direction string // "input" or "output"
Active bool
// contains filtered or unexported fields
}
TransportConnector represents a connection to a transport
func NewTransportConnector ¶
func NewTransportConnector(id string, transport MediaTransport, direction string) *TransportConnector
NewTransportConnector creates a new transport connector
func (*TransportConnector) IsActive ¶
func (tc *TransportConnector) IsActive() bool
IsActive checks if connector is active
func (*TransportConnector) SetActive ¶
func (tc *TransportConnector) SetActive(active bool)
SetActive sets the active state
func (*TransportConnector) String ¶
func (tc *TransportConnector) String() string
String returns string representation
type TransportManager ¶
type TransportManager struct {
// contains filtered or unexported fields
}
TransportManager manages transport connections
func (*TransportManager) String ¶
func (tl *TransportManager) String() string
type TurnDetectionData ¶
type VideoCodec ¶
type VideoCodec string
VideoCodec represents supported video codecs
const ( // Uncompressed VideoCodecRaw VideoCodec = "raw" // H.26x family VideoCodecH261 VideoCodec = "h261" VideoCodecH263 VideoCodec = "h263" VideoCodecH264 VideoCodec = "h264" VideoCodecH265 VideoCodec = "h265" VideoCodecH266 VideoCodec = "h266" // VP family VideoCodecVP8 VideoCodec = "vp8" VideoCodecVP9 VideoCodec = "vp9" // AV family VideoCodecAV1 VideoCodec = "av1" // MPEG family VideoCodecMPEG1 VideoCodec = "mpeg1" VideoCodecMPEG2 VideoCodec = "mpeg2" VideoCodecMPEG4 VideoCodec = "mpeg4" // Proprietary VideoCodecWMV VideoCodec = "wmv" VideoCodecRV VideoCodec = "rv" VideoCodecProRes VideoCodec = "prores" VideoCodecDNxHD VideoCodec = "dnxhd" // Older/Legacy VideoCodecSorenson VideoCodec = "sorenson" VideoCodecCinepak VideoCodec = "cinepak" )
type VideoProfile ¶
type VideoProfile string
VideoProfile represents video codec profiles
const ( // H.264 profiles VideoProfileH264_Baseline VideoProfile = "h264_baseline" VideoProfileH264_Main VideoProfile = "h264_main" VideoProfileH264_High VideoProfile = "h264_high" // H.265 profiles VideoProfileH265_Main VideoProfile = "h265_main" VideoProfileH265_Main10 VideoProfile = "h265_main10" VideoProfileH265_MainStill VideoProfile = "h265_main_still" // VP9 profiles VideoProfileVP9_Profile0 VideoProfile = "vp9_profile0" VideoProfileVP9_Profile1 VideoProfile = "vp9_profile1" VideoProfileVP9_Profile2 VideoProfile = "vp9_profile2" VideoProfileVP9_Profile3 VideoProfile = "vp9_profile3" // AV1 profiles VideoProfileAV1_Main VideoProfile = "av1_main" VideoProfileAV1_High VideoProfile = "av1_high" VideoProfileAV1_Pro VideoProfile = "av1_pro" )