Documentation
¶
Index ¶
- Constants
- Variables
- func CastOption[T any](options map[string]any) (val T)
- func ResamplePCM(data []byte, inputRate, outputRate int) ([]byte, error)
- func SetDefaultResampler(factory ConverterFactory)
- type AsyncTaskRunner
- func (tr *AsyncTaskRunner[T]) CancelActiveTask()
- func (tr *AsyncTaskRunner[T]) HandleMediaData(h MediaHandler, data MediaData)
- func (tr *AsyncTaskRunner[T]) HandlePacket(h MediaHandler, packet MediaPacket)
- func (tr *AsyncTaskRunner[T]) HandleState(h MediaHandler, event StateChange)
- func (tr *AsyncTaskRunner[T]) ReleaseResources()
- type AudioPacket
- type BaseProcessor
- type ClosePacket
- type CodecConfig
- type CompletedData
- type ConverterFactory
- type EncoderFunc
- type ErrorHandler
- type EventBus
- func (eb *EventBus) Close()
- func (eb *EventBus) Publish(event *MediaEvent)
- func (eb *EventBus) PublishError(sessionID string, err error, sender interface{})
- func (eb *EventBus) PublishPacket(sessionID string, packet MediaPacket, sender interface{})
- func (eb *EventBus) PublishState(sessionID string, state StateChange, sender interface{})
- func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler)
- func (eb *EventBus) Unsubscribe(eventType EventType, handler EventHandler)
- type EventHandler
- type EventType
- type FuncProcessor
- type InterpolatingConverter
- type LocalMediaCache
- type MediaData
- type MediaEvent
- type MediaHandler
- type MediaHandlerFunc
- type MediaPacket
- type MediaSession
- func (s *MediaSession) AddInputTransport(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) AddMetric(key string, duration time.Duration)
- func (s *MediaSession) AddOutputTransport(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) CauseError(sender any, err error)
- func (s *MediaSession) Close() error
- func (s *MediaSession) Codec() CodecConfig
- func (s *MediaSession) Context(parent context.Context) *MediaSession
- func (s *MediaSession) Decode(dec EncoderFunc) *MediaSession
- func (s *MediaSession) Delete(key string)
- func (s *MediaSession) EmitPacket(sender any, packet MediaPacket)
- func (s *MediaSession) EmitState(sender any, state string, params ...any)
- func (s *MediaSession) Encode(enc EncoderFunc) *MediaSession
- func (s *MediaSession) Error(handles ...ErrorHandler) *MediaSession
- func (s *MediaSession) Get(key string) (val any, ok bool)
- func (s *MediaSession) GetAllMetrics() map[string]interface{}
- func (s *MediaSession) GetAudioPacketCount() uint64
- func (s *MediaSession) GetAveragePacketSize() uint64
- func (s *MediaSession) GetContext() context.Context
- func (s *MediaSession) GetErrorCount() uint64
- func (s *MediaSession) GetMetrics() (packetCount, totalBytes uint64)
- func (s *MediaSession) GetPacketCount() uint64
- func (s *MediaSession) GetSession() *MediaSession
- func (s *MediaSession) GetSessionDuration() time.Duration
- func (s *MediaSession) GetStateChangeCount() uint64
- func (s *MediaSession) GetString(key string) string
- func (s *MediaSession) GetTextPacketCount() uint64
- func (s *MediaSession) GetTotalBytes() uint64
- func (s *MediaSession) GetUint(key string) uint
- func (s *MediaSession) InjectPacket(f PacketFilter)
- func (s *MediaSession) Input(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) IsValid() error
- func (s *MediaSession) On(state string, handles ...StateChangeHandler) *MediaSession
- func (s *MediaSession) Output(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
- func (s *MediaSession) Pipeline(handles ...MediaHandlerFunc) *MediaSession
- func (s *MediaSession) PostHook(hooks ...SessionHook) *MediaSession
- func (s *MediaSession) RegisterProcessor(processor Processor) *MediaSession
- func (s *MediaSession) SendToOutput(sender any, packet MediaPacket)
- func (s *MediaSession) Serve() error
- func (s *MediaSession) Set(key string, val any)
- func (s *MediaSession) SetSessionID(id string) *MediaSession
- func (s *MediaSession) String() string
- func (s *MediaSession) Trace(trace MediaHandlerFunc) *MediaSession
- func (s *MediaSession) UseMiddleware(handles ...MediaHandlerFunc) *MediaSession
- type MediaTransport
- type PacketFilter
- type PacketProcessor
- type PacketRequest
- type PipelineStage
- func (sp *PipelineStage) AddMetric(key string, duration time.Duration)
- func (sp *PipelineStage) CauseError(sender any, err error)
- func (sp *PipelineStage) EmitPacket(sender any, packet MediaPacket)
- func (sp *PipelineStage) EmitState(sender any, state string, params ...any)
- func (sp *PipelineStage) GetContext() context.Context
- func (sp *PipelineStage) GetSession() *MediaSession
- func (sp *PipelineStage) InjectPacket(f PacketFilter)
- func (sp *PipelineStage) SendToOutput(sender any, packet MediaPacket)
- func (sp *PipelineStage) String() string
- type Processor
- type ProcessorCondition
- type ProcessorPriority
- type ProcessorRegistry
- type RouteRule
- type Router
- type RoutingStrategy
- type SampleRateConverter
- type SessionHook
- type SessionMetrics
- func (sm *SessionMetrics) GetAllMetrics() map[string]interface{}
- func (sm *SessionMetrics) GetAudioPacketCount() uint64
- func (sm *SessionMetrics) GetAveragePacketSize() uint64
- func (sm *SessionMetrics) GetErrorCount() uint64
- func (sm *SessionMetrics) GetMaxPacketSize() uint64
- func (sm *SessionMetrics) GetMetrics() (packetCount, totalBytes uint64)
- func (sm *SessionMetrics) GetMinPacketSize() uint64
- func (sm *SessionMetrics) GetPacketCount() uint64
- func (sm *SessionMetrics) GetSessionDuration() time.Duration
- func (sm *SessionMetrics) GetStateChangeCount() uint64
- func (sm *SessionMetrics) GetTextPacketCount() uint64
- func (sm *SessionMetrics) GetTotalBytes() uint64
- type StateChange
- type StateChangeHandler
- type StreamFormat
- type TextPacket
- type TranscribingData
- type TransportConnector
- type TransportManager
- type TurnDetectionData
Constants ¶
const ( DirectionInput = "rx" DirectionOutput = "tx" )
Constants
Variables ¶
var ( AllStates = "*" Begin = "begin" End = "end" Hangup = "hangup" StartSpeaking = "speaking.start" StartSilence = "silence.start" Transcribing = "transcribing" // params: sentence string Synthesizing = "synthesizing" // params: result string StartPlay = "play.start" StopPlay = "play.stop" Completed = "completed" // interrupt Interruption = "interruption" )
State types
var ( MediaDataTypeState = "state" MediaDataTypePacket = "packet" MediaDataTypeMetric = "metric" )
var ( ErrNotInputTransport = errors.New("not input transport") ErrNotOutputTransport = errors.New("not output transport") ErrCodecNotSupported = errors.New("codec not supported") )
var ( AgentRunning = "_agent_running" WorkingState = "_working_state" UpstreamRunning = "_upstream_running" )
Functions ¶
func CastOption ¶
func ResamplePCM ¶
ResamplePCM converts audio data from one sample rate to another
func SetDefaultResampler ¶
func SetDefaultResampler(factory ConverterFactory)
SetDefaultResampler sets the default converter factory
Types ¶
type AsyncTaskRunner ¶
type AsyncTaskRunner[T any] struct { InitCallback func(h MediaHandler) error TerminateCallback func(h MediaHandler) error StateCallback func(h MediaHandler, event StateChange) error RequestBuilder func(h MediaHandler, packet MediaPacket) (*PacketRequest[T], error) TaskExecutor func(ctx context.Context, h MediaHandler, req PacketRequest[T]) error WorkerPoolSize int TaskTimeout time.Duration MaxTaskTimeout time.Duration ConcurrentMode bool // contains filtered or unexported fields }
AsyncTaskRunner handles asynchronous task execution using true multi-worker pool pattern
func NewAsyncTaskRunner ¶
func NewAsyncTaskRunner[T any](queueSize int) AsyncTaskRunner[T]
NewAsyncTaskRunner creates a new task runner with worker pool configuration
func (*AsyncTaskRunner[T]) CancelActiveTask ¶
func (tr *AsyncTaskRunner[T]) CancelActiveTask()
CancelActiveTask stops all worker tasks (for multi-worker pool)
func (*AsyncTaskRunner[T]) HandleMediaData ¶
func (tr *AsyncTaskRunner[T]) HandleMediaData(h MediaHandler, data MediaData)
HandleMediaData routes media data to appropriate handlers
func (*AsyncTaskRunner[T]) HandlePacket ¶
func (tr *AsyncTaskRunner[T]) HandlePacket(h MediaHandler, packet MediaPacket)
HandlePacket processes packet data through task queue
func (*AsyncTaskRunner[T]) HandleState ¶
func (tr *AsyncTaskRunner[T]) HandleState(h MediaHandler, event StateChange)
HandleState processes state change events
func (*AsyncTaskRunner[T]) ReleaseResources ¶
func (tr *AsyncTaskRunner[T]) ReleaseResources()
ReleaseResources frees allocated resources and stops all workers
type AudioPacket ¶
type AudioPacket struct {
PlayID string `json:"id,omitempty"`
Sequence int `json:"sequence"`
Payload []byte `json:"payload"`
IsFirstPacket bool `json:"isFirstPacket,omitempty"`
IsEndPacket bool `json:"isEndPacket,omitempty"`
IsSynthesized bool `json:"isSynthesized,omitempty"`
IsSilence bool `json:"isSilence,omitempty"`
SourceText string `json:"sourceText,omitempty"`
}
func (*AudioPacket) Body ¶
func (d *AudioPacket) Body() []byte
func (*AudioPacket) String ¶
func (d *AudioPacket) String() string
type BaseProcessor ¶
type BaseProcessor struct {
// contains filtered or unexported fields
}
BaseProcessor provides default implementation for common processor functionality
func NewBaseProcessor ¶
func NewBaseProcessor(name string, priority ProcessorPriority) *BaseProcessor
NewBaseProcessor creates a base processor
func (*BaseProcessor) CanHandle ¶
func (bp *BaseProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool
CanHandle checks if the processor can handle the event
func (*BaseProcessor) Priority ¶
func (bp *BaseProcessor) Priority() ProcessorPriority
Priority returns the processor priority
func (*BaseProcessor) WithCondition ¶
func (bp *BaseProcessor) WithCondition(condition ProcessorCondition) *BaseProcessor
WithCondition sets a condition for the processor
type ClosePacket ¶
type ClosePacket struct {
Reason string `json:"reason"`
}
func (*ClosePacket) Body ¶
func (f *ClosePacket) Body() []byte
func (*ClosePacket) String ¶
func (f *ClosePacket) String() string
type CodecConfig ¶
type CodecConfig struct {
Codec string `json:"codec" form:"codec" default:"pcm"`
SampleRate int `json:"sampleRate" form:"sample_rate" default:"16000"`
Channels int `json:"channels" form:"channels" default:"1"`
BitDepth int `json:"bitDepth" form:"bit_depth" default:"16"`
FrameDuration string `json:"frameDuration" form:"frame_duration"`
PayloadType uint8 `json:"payloadType" form:"payload_type"`
}
CodecConfig defines codec configuration
func DefaultCodecConfig ¶
func DefaultCodecConfig() CodecConfig
func (CodecConfig) String ¶
func (c CodecConfig) String() string
type CompletedData ¶
type CompletedData struct {
SenderName string `json:"senderName"` // eg: tts.aws, asr.qcloud
Duration time.Duration `json:"duration"` // total duration
Source MediaPacket `json:"-"` // last packet
Result any `json:"result"` // result
AssistantId uint `json:"assistantId"`
AssistantVid uint `json:"assistantVid"`
DialogID string `json:"dialogID"`
}
func (*CompletedData) MarshalJSON ¶
func (d *CompletedData) MarshalJSON() ([]byte, error)
func (CompletedData) String ¶
func (d CompletedData) String() string
type ConverterFactory ¶
type ConverterFactory func(inputRate, outputRate int) SampleRateConverter
ConverterFactory creates a sample rate converter
type EncoderFunc ¶
type EncoderFunc func(packet MediaPacket) ([]MediaPacket, error)
type ErrorHandler ¶
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages event distribution using pub/sub pattern
func NewEventBus ¶
NewEventBus creates a new event bus
func (*EventBus) Publish ¶
func (eb *EventBus) Publish(event *MediaEvent)
Publish sends an event to all subscribers
func (*EventBus) PublishError ¶
PublishError publishes an error event
func (*EventBus) PublishPacket ¶
func (eb *EventBus) PublishPacket(sessionID string, packet MediaPacket, sender interface{})
PublishPacket publishes a packet event
func (*EventBus) PublishState ¶
func (eb *EventBus) PublishState(sessionID string, state StateChange, sender interface{})
PublishState publishes a state change event
func (*EventBus) Subscribe ¶
func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler)
Subscribe registers an event handlers for a specific event type
func (*EventBus) Unsubscribe ¶
func (eb *EventBus) Unsubscribe(eventType EventType, handler EventHandler)
Unsubscribe removes an event handlers
type EventHandler ¶
type EventHandler func(ctx context.Context, event *MediaEvent) error
EventHandler processes events from the event bus
type FuncProcessor ¶
type FuncProcessor struct {
*BaseProcessor
// contains filtered or unexported fields
}
FuncProcessor is a processor implemented as a function
func NewFuncProcessor ¶
func NewFuncProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, event *MediaEvent) error) *FuncProcessor
NewFuncProcessor creates a function-based processor
func NewHighPriorityProcessor ¶
func NewHighPriorityProcessor(name string, fn func(ctx context.Context, session *MediaSession, event *MediaEvent) error) *FuncProcessor
NewHighPriorityProcessor creates a processor with PriorityHigh Use this for processors that must run first (e.g., validation, security, monitoring)
func (*FuncProcessor) Process ¶
func (fp *FuncProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
Process executes the processor function
type InterpolatingConverter ¶
type InterpolatingConverter struct {
// contains filtered or unexported fields
}
InterpolatingConverter performs optimized interpolation for sample rate conversion
func (*InterpolatingConverter) Close ¶
func (ic *InterpolatingConverter) Close() error
func (*InterpolatingConverter) ConvertSamples ¶
func (ic *InterpolatingConverter) ConvertSamples(samples []byte) []byte
ConvertSamples performs interpolation (linear by default, cubic if enabled)
func (*InterpolatingConverter) Samples ¶
func (ic *InterpolatingConverter) Samples() []byte
type LocalMediaCache ¶
func MediaCache ¶
func MediaCache() *LocalMediaCache
func (*LocalMediaCache) BuildKey ¶
func (c *LocalMediaCache) BuildKey(params ...string) string
type MediaData ¶
type MediaData struct {
CreatedAt time.Time
Sender any
Type string
State StateChange
Packet MediaPacket
Duration *time.Duration
}
type MediaEvent ¶
type MediaEvent struct {
Type EventType
Timestamp time.Time
SessionID string
Payload interface{}
Metadata map[string]interface{}
}
MediaEvent represents an event in the event bus
type MediaHandler ¶
type MediaHandler interface {
GetContext() context.Context
GetSession() *MediaSession
CauseError(sender any, err error)
EmitState(sender any, state string, params ...any)
EmitPacket(sender any, packet MediaPacket)
SendToOutput(sender any, packet MediaPacket)
AddMetric(key string, duration time.Duration)
InjectPacket(f PacketFilter)
}
type MediaHandlerFunc ¶
type MediaHandlerFunc func(h MediaHandler, data MediaData)
type MediaPacket ¶
MediaPacket types - represents media data packets
type MediaSession ¶
type MediaSession struct {
ID string `json:"id"`
Running bool `json:"running"`
QueueSize int `json:"queueSize"`
SampleRate int // sample rate of the session
MaxSessionDuration int `json:"maxSessionDuration"` // Set the maximum session duration in seconds.
EffectAudios map[string]*[]byte `json:"-"`
StartAt time.Time `json:"startAt"`
// contains filtered or unexported fields
}
func NewDefaultSession ¶
func NewDefaultSession() *MediaSession
func (*MediaSession) AddInputTransport ¶
func (s *MediaSession) AddInputTransport(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
AddInputTransport registers input transport with different method name
func (*MediaSession) AddMetric ¶
func (s *MediaSession) AddMetric(key string, duration time.Duration)
func (*MediaSession) AddOutputTransport ¶
func (s *MediaSession) AddOutputTransport(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
AddOutputTransport registers output transport with different method name
func (*MediaSession) CauseError ¶
func (s *MediaSession) CauseError(sender any, err error)
func (*MediaSession) Close ¶
func (s *MediaSession) Close() error
func (*MediaSession) Codec ¶
func (s *MediaSession) Codec() CodecConfig
func (*MediaSession) Context ¶
func (s *MediaSession) Context(parent context.Context) *MediaSession
chainable methods
func (*MediaSession) Decode ¶
func (s *MediaSession) Decode(dec EncoderFunc) *MediaSession
func (*MediaSession) Delete ¶
func (s *MediaSession) Delete(key string)
func (*MediaSession) EmitPacket ¶
func (s *MediaSession) EmitPacket(sender any, packet MediaPacket)
func (*MediaSession) EmitState ¶
func (s *MediaSession) EmitState(sender any, state string, params ...any)
func (*MediaSession) Encode ¶
func (s *MediaSession) Encode(enc EncoderFunc) *MediaSession
func (*MediaSession) Error ¶
func (s *MediaSession) Error(handles ...ErrorHandler) *MediaSession
Handle error caused
func (*MediaSession) GetAllMetrics ¶
func (s *MediaSession) GetAllMetrics() map[string]interface{}
GetAllMetrics returns all metrics in a structured format
func (*MediaSession) GetAudioPacketCount ¶
func (s *MediaSession) GetAudioPacketCount() uint64
GetAudioPacketCount returns the number of audio packets
func (*MediaSession) GetAveragePacketSize ¶
func (s *MediaSession) GetAveragePacketSize() uint64
GetAveragePacketSize returns the average packet size
func (*MediaSession) GetContext ¶
func (s *MediaSession) GetContext() context.Context
func (*MediaSession) GetErrorCount ¶
func (s *MediaSession) GetErrorCount() uint64
GetErrorCount returns the total error count
func (*MediaSession) GetMetrics ¶
func (s *MediaSession) GetMetrics() (packetCount, totalBytes uint64)
GetMetrics returns session metrics (backward compatible)
func (*MediaSession) GetPacketCount ¶
func (s *MediaSession) GetPacketCount() uint64
GetPacketCount returns the total number of packets processed
func (*MediaSession) GetSession ¶
func (s *MediaSession) GetSession() *MediaSession
func (*MediaSession) GetSessionDuration ¶
func (s *MediaSession) GetSessionDuration() time.Duration
GetSessionDuration returns the session duration based on packet timestamps
func (*MediaSession) GetStateChangeCount ¶
func (s *MediaSession) GetStateChangeCount() uint64
GetStateChangeCount returns the number of state changes
func (*MediaSession) GetString ¶
func (s *MediaSession) GetString(key string) string
func (*MediaSession) GetTextPacketCount ¶
func (s *MediaSession) GetTextPacketCount() uint64
GetTextPacketCount returns the number of text packets
func (*MediaSession) GetTotalBytes ¶
func (s *MediaSession) GetTotalBytes() uint64
GetTotalBytes returns the total bytes processed
func (*MediaSession) GetUint ¶
func (s *MediaSession) GetUint(key string) uint
func (*MediaSession) Input ¶
func (s *MediaSession) Input(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
Input is an alias for backward compatibility
func (*MediaSession) IsValid ¶
func (s *MediaSession) IsValid() error
func (*MediaSession) On ¶
func (s *MediaSession) On(state string, handles ...StateChangeHandler) *MediaSession
func (*MediaSession) Output ¶
func (s *MediaSession) Output(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession
Output is an alias for backward compatibility
func (*MediaSession) Pipeline ¶
func (s *MediaSession) Pipeline(handles ...MediaHandlerFunc) *MediaSession
Pipeline is an alias for backward compatibility
func (*MediaSession) PostHook ¶
func (s *MediaSession) PostHook(hooks ...SessionHook) *MediaSession
func (*MediaSession) RegisterProcessor ¶
func (s *MediaSession) RegisterProcessor(processor Processor) *MediaSession
RegisterProcessor registers a processor in the registry
func (*MediaSession) SendToOutput ¶
func (s *MediaSession) SendToOutput(sender any, packet MediaPacket)
func (*MediaSession) Serve ¶
func (s *MediaSession) Serve() error
Serve Start the session, this will block the current goroutine
func (*MediaSession) Set ¶
func (s *MediaSession) Set(key string, val any)
func (*MediaSession) SetSessionID ¶
func (s *MediaSession) SetSessionID(id string) *MediaSession
func (*MediaSession) String ¶
func (s *MediaSession) String() string
func (*MediaSession) Trace ¶
func (s *MediaSession) Trace(trace MediaHandlerFunc) *MediaSession
func (*MediaSession) UseMiddleware ¶
func (s *MediaSession) UseMiddleware(handles ...MediaHandlerFunc) *MediaSession
UseMiddleware is deprecated, use RegisterProcessor instead
type MediaTransport ¶
type MediaTransport interface {
io.Closer
String() string
Attach(s *MediaSession)
Next(ctx context.Context) (MediaPacket, error)
Send(ctx context.Context, packet MediaPacket) (int, error)
Codec() CodecConfig
Close() error
}
MediaTransport interface for media transport
type PacketFilter ¶
type PacketFilter func(packet MediaPacket) (bool, error)
type PacketProcessor ¶
type PacketProcessor struct {
*BaseProcessor
// contains filtered or unexported fields
}
PacketProcessor is a specialized processor for packet events
func NewHighPriorityPacketProcessor ¶
func NewHighPriorityPacketProcessor(name string, fn func(ctx context.Context, session *MediaSession, packet MediaPacket) error) *PacketProcessor
NewHighPriorityPacketProcessor creates a packet processor with PriorityHigh Use this for packet processors that must run first (e.g., packet validation, monitoring)
func NewPacketProcessor ¶
func NewPacketProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, packet MediaPacket) error) *PacketProcessor
NewPacketProcessor creates a packet processor
func (*PacketProcessor) CanHandle ¶
func (pp *PacketProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool
CanHandle checks if this is a packet event
func (*PacketProcessor) Process ¶
func (pp *PacketProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
Process handles packet events
type PacketRequest ¶
type PacketRequest[R any] struct { H MediaHandler Interrupt bool Req R }
PacketRequest is an alias for backward compatibility - maps Interrupt to shouldStop
type PipelineStage ¶
type PipelineStage struct {
// contains filtered or unexported fields
}
PipelineStage represents a single stage in the processing pipeline Uses asynchronous event-driven architecture instead of synchronous chain
func (*PipelineStage) AddMetric ¶
func (sp *PipelineStage) AddMetric(key string, duration time.Duration)
func (*PipelineStage) CauseError ¶
func (sp *PipelineStage) CauseError(sender any, err error)
func (*PipelineStage) EmitPacket ¶
func (sp *PipelineStage) EmitPacket(sender any, packet MediaPacket)
EmitPacket enqueues packet for asynchronous processing
func (*PipelineStage) EmitState ¶
func (sp *PipelineStage) EmitState(sender any, state string, params ...any)
func (*PipelineStage) GetContext ¶
func (sp *PipelineStage) GetContext() context.Context
func (*PipelineStage) GetSession ¶
func (sp *PipelineStage) GetSession() *MediaSession
func (*PipelineStage) InjectPacket ¶
func (sp *PipelineStage) InjectPacket(f PacketFilter)
InjectPacket sets pre-processing filter function
func (*PipelineStage) SendToOutput ¶
func (sp *PipelineStage) SendToOutput(sender any, packet MediaPacket)
func (*PipelineStage) String ¶
func (sp *PipelineStage) String() string
type Processor ¶
type Processor interface {
// Name returns the processor name
Name() string
// Priority returns processing priority (higher = processed first)
Priority() ProcessorPriority
// CanHandle checks if this processor can handle the event
CanHandle(ctx context.Context, event *MediaEvent) bool
// Process handles the event
Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
}
Processor handles media events
type ProcessorCondition ¶
type ProcessorCondition func(ctx context.Context, event *MediaEvent) bool
ProcessorCondition determines if a processor should handle an event
type ProcessorPriority ¶
type ProcessorPriority int
ProcessorPriority defines processing priority
const ( // PriorityLow is used for processors that should run last (e.g., output routing) PriorityLow ProcessorPriority = 0 // PriorityNormal is used for general-purpose processors (e.g., middleware, transformations) PriorityNormal ProcessorPriority = 50 // PriorityHigh is used for processors that must run first (e.g., validation, security, monitoring) PriorityHigh ProcessorPriority = 100 )
type ProcessorRegistry ¶
type ProcessorRegistry struct {
// contains filtered or unexported fields
}
ProcessorRegistry manages registered processors
func NewProcessorRegistry ¶
func NewProcessorRegistry() *ProcessorRegistry
NewProcessorRegistry creates a new processor registry
func (*ProcessorRegistry) GetAllProcessors ¶
func (pr *ProcessorRegistry) GetAllProcessors() []Processor
GetAllProcessors returns all registered processors
func (*ProcessorRegistry) GetProcessors ¶
func (pr *ProcessorRegistry) GetProcessors(ctx context.Context, event *MediaEvent) []Processor
GetProcessors returns all processors that can handle the event, in priority order
func (*ProcessorRegistry) Register ¶
func (pr *ProcessorRegistry) Register(processor Processor)
Register adds a processor to the registry
func (*ProcessorRegistry) Unregister ¶
func (pr *ProcessorRegistry) Unregister(name string)
Unregister removes a processor
type RouteRule ¶
type RouteRule struct {
Condition func(packet MediaPacket) bool
Targets []string // Transport IDs
Strategy RoutingStrategy
}
RouteRule defines routing rules
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router manages packet routing
func NewRouter ¶
func NewRouter(defaultStrategy RoutingStrategy) *Router
NewRouter creates a new router
func (*Router) Route ¶
func (r *Router) Route(packet MediaPacket, availableTransports []*TransportConnector) []*TransportConnector
Route determines where to send a packet
type RoutingStrategy ¶
type RoutingStrategy int
RoutingStrategy defines how packets are routed
const ( // StrategyBroadcast sends to all outputs StrategyBroadcast RoutingStrategy = iota // StrategyRoundRobin distributes across outputs StrategyRoundRobin // StrategyFirstAvailable uses first available output StrategyFirstAvailable )
type SampleRateConverter ¶
type SampleRateConverter interface {
io.WriteCloser
Samples() []byte
}
SampleRateConverter defines interface for converting sample rates
func DefaultResampler ¶
func DefaultResampler(inputRate, outputRate int) SampleRateConverter
DefaultResampler creates a default sample rate converter
func NewCubicInterpolatingConverter ¶
func NewCubicInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter
NewCubicInterpolatingConverter creates a converter with cubic interpolation (better quality)
func NewInterpolatingConverter ¶
func NewInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter
NewInterpolatingConverter creates a new interpolating converter with linear interpolation (fast)
type SessionHook ¶
type SessionHook func(session *MediaSession)
type SessionMetrics ¶
type SessionMetrics struct {
// contains filtered or unexported fields
}
SessionMetrics tracks comprehensive session statistics
func (*SessionMetrics) GetAllMetrics ¶
func (sm *SessionMetrics) GetAllMetrics() map[string]interface{}
GetAllMetrics returns all metrics in a structured format
func (*SessionMetrics) GetAudioPacketCount ¶
func (sm *SessionMetrics) GetAudioPacketCount() uint64
GetAudioPacketCount returns the number of audio packets
func (*SessionMetrics) GetAveragePacketSize ¶
func (sm *SessionMetrics) GetAveragePacketSize() uint64
GetAveragePacketSize returns the average packet size
func (*SessionMetrics) GetErrorCount ¶
func (sm *SessionMetrics) GetErrorCount() uint64
GetErrorCount returns the total error count
func (*SessionMetrics) GetMaxPacketSize ¶
func (sm *SessionMetrics) GetMaxPacketSize() uint64
GetMaxPacketSize returns the maximum packet size
func (*SessionMetrics) GetMetrics ¶
func (sm *SessionMetrics) GetMetrics() (packetCount, totalBytes uint64)
GetMetrics returns a copy of current metrics
func (*SessionMetrics) GetMinPacketSize ¶
func (sm *SessionMetrics) GetMinPacketSize() uint64
GetMinPacketSize returns the minimum packet size
func (*SessionMetrics) GetPacketCount ¶
func (sm *SessionMetrics) GetPacketCount() uint64
GetPacketCount returns the total number of packets processed
func (*SessionMetrics) GetSessionDuration ¶
func (sm *SessionMetrics) GetSessionDuration() time.Duration
GetSessionDuration returns the session duration based on packet timestamps
func (*SessionMetrics) GetStateChangeCount ¶
func (sm *SessionMetrics) GetStateChangeCount() uint64
GetStateChangeCount returns the number of state changes
func (*SessionMetrics) GetTextPacketCount ¶
func (sm *SessionMetrics) GetTextPacketCount() uint64
GetTextPacketCount returns the number of text packets
func (*SessionMetrics) GetTotalBytes ¶
func (sm *SessionMetrics) GetTotalBytes() uint64
GetTotalBytes returns the total bytes processed
type StateChange ¶
func (*StateChange) SafeGetStr ¶
func (s *StateChange) SafeGetStr(idx int) string
type StateChangeHandler ¶
type StateChangeHandler func(event StateChange)
type StreamFormat ¶
type TextPacket ¶
type TextPacket struct {
PlayID string `json:"id,omitempty"`
Text string `json:"text"`
IsTranscribed bool `json:"isTranscribed"`
IsLLMGenerated bool `json:"isLLMGenerated"`
IsPartial bool `json:"isPartial"`
IsEnd bool `json:"isEnd"`
Sequence int `json:"sequence"`
StartAt time.Time `json:"startAt"`
}
func (*TextPacket) Body ¶
func (t *TextPacket) Body() []byte
func (*TextPacket) String ¶
func (t *TextPacket) String() string
type TranscribingData ¶
type TranscribingData struct {
SenderName string `json:"senderName"` // eg: tts.aws, asr.qcloud
Duration time.Duration `json:"duration"` // total duration
Source MediaPacket `json:"-"` // last packet
Result any `json:"result"` // result
Direction string `json:"direction"` // direction
DialogID string `json:"dialogID"`
}
func (*TranscribingData) MarshalJSON ¶
func (d *TranscribingData) MarshalJSON() ([]byte, error)
func (TranscribingData) String ¶
func (d TranscribingData) String() string
type TransportConnector ¶
type TransportConnector struct {
ID string
Transport MediaTransport
Direction string // "input" or "output"
Active bool
// contains filtered or unexported fields
}
TransportConnector represents a connection to a transport
func NewTransportConnector ¶
func NewTransportConnector(id string, transport MediaTransport, direction string) *TransportConnector
NewTransportConnector creates a new transport connector
func (*TransportConnector) IsActive ¶
func (tc *TransportConnector) IsActive() bool
IsActive checks if connector is active
func (*TransportConnector) SetActive ¶
func (tc *TransportConnector) SetActive(active bool)
SetActive sets the active state
func (*TransportConnector) String ¶
func (tc *TransportConnector) String() string
String returns string representation
type TransportManager ¶
type TransportManager struct {
// contains filtered or unexported fields
}
TransportManager manages transport connections
func (*TransportManager) String ¶
func (tl *TransportManager) String() string