Documentation
¶
Overview ¶
Package observers provides optional metrics (latency, token usage) and OpenTelemetry stub.
Package observers provides optional frame processing observers for metrics and logging.
Package observers provides turn tracking observer for conversation flow monitoring.
Package observers provides user-to-bot latency observer.
Index ¶
- Variables
- func WrapWithObserver(p processors.Processor, observer Observer) processors.Processor
- type CompositeObserver
- type LLMTokenUsage
- type Metrics
- func (m *Metrics) AddChars(n int64)
- func (m *Metrics) AddTokens(n int64)
- func (m *Metrics) IncFrames()
- func (m *Metrics) RecordLLMUsage(u LLMTokenUsage)
- func (m *Metrics) RecordLatency(d time.Duration)
- func (m *Metrics) RecordTTFB(d time.Duration)
- func (m *Metrics) RecordTurnMetrics(t TurnMetrics)
- func (m *Metrics) Snapshot() (latency time.Duration, tokens, chars, frames int64, last time.Time)
- func (m *Metrics) SnapshotFull() (inputLatency, ttfb time.Duration, tokens, chars, frames int64, last time.Time, ...)
- type NoopObserver
- type Observer
- type ObserverWithMetrics
- type ObservingProcessor
- func (o *ObservingProcessor) Cleanup(ctx context.Context) error
- func (o *ObservingProcessor) Name() string
- func (o *ObservingProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
- func (o *ObservingProcessor) SetNext(p processors.Processor)
- func (o *ObservingProcessor) SetPrev(p processors.Processor)
- func (o *ObservingProcessor) Setup(ctx context.Context) error
- type TranscriptObserver
- type TurnMetrics
- type TurnTrackingObserver
- type TurnTrackingOption
- type UserBotLatencyObserver
- type UserBotLatencyOption
Constants ¶
This section is empty.
Variables ¶
var OTELExport = func(metrics *Metrics) {
_ = metrics
}
OpenTelemetry stub: no-op export. Replace with otel export when integrating.
Functions ¶
func WrapWithObserver ¶
func WrapWithObserver(p processors.Processor, observer Observer) processors.Processor
WrapWithObserver returns a processor that forwards to p then notifies observer (after inner has run).
Types ¶
type CompositeObserver ¶
type CompositeObserver struct {
Observers []Observer
}
CompositeObserver forwards OnFrameProcessed to multiple observers.
func NewCompositeObserver ¶
func NewCompositeObserver(observers ...Observer) *CompositeObserver
NewCompositeObserver returns an observer that delegates to all observers in order.
func (*CompositeObserver) OnFrameProcessed ¶
func (c *CompositeObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed implements Observer.
type LLMTokenUsage ¶
type LLMTokenUsage struct {
PromptTokens int
CompletionTokens int
TotalTokens int
CacheReadInputTokens int // optional
CacheCreationInputTokens int // optional
ReasoningTokens int // optional
}
LLMTokenUsage holds token usage for an LLM call.
type Metrics ¶
type Metrics struct {
InputLatency time.Duration // time from input to first output
TTFBDuration time.Duration // time to first byte (e.g. first LLM/TTS output)
TokenCount int64 // LLM tokens (approximate; legacy)
CharCount int64 // TTS characters
FrameCount int64 // frames processed
LastProcessed time.Time
// Structured usage (last recorded)
LastLLMUsage LLMTokenUsage
LastTurnMetrics TurnMetrics
// contains filtered or unexported fields
}
Metrics holds simple counters and latencies for pipeline/processors. THREAD SAFETY: mu guards all fields; safe for concurrent use from observer callbacks.
func (*Metrics) RecordLLMUsage ¶
func (m *Metrics) RecordLLMUsage(u LLMTokenUsage)
RecordLLMUsage records LLM token usage (and updates legacy TokenCount).
func (*Metrics) RecordLatency ¶
RecordLatency sets the input-to-output latency.
func (*Metrics) RecordTTFB ¶
RecordTTFB sets the time-to-first-byte duration.
func (*Metrics) RecordTurnMetrics ¶
func (m *Metrics) RecordTurnMetrics(t TurnMetrics)
RecordTurnMetrics records turn detection metrics.
func (*Metrics) SnapshotFull ¶
func (m *Metrics) SnapshotFull() (inputLatency, ttfb time.Duration, tokens, chars, frames int64, last time.Time, llm LLMTokenUsage, turn TurnMetrics)
SnapshotFull returns latency, TTFB, tokens, chars, frames, last time, last LLM usage, and last turn metrics.
type NoopObserver ¶
type NoopObserver struct{}
NoopObserver does nothing.
func (NoopObserver) OnFrameProcessed ¶
func (NoopObserver) OnFrameProcessed(string, frames.Frame, processors.Direction)
type Observer ¶
type Observer interface {
OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
}
Observer is notified when frames are processed or pushed.
type ObserverWithMetrics ¶
type ObserverWithMetrics struct {
Observer Observer
Metrics *Metrics
// contains filtered or unexported fields
}
ObserverWithMetrics wraps an observer and updates metrics (latency, token/char count).
func NewObserverWithMetrics ¶
func NewObserverWithMetrics(o Observer, m *Metrics) *ObserverWithMetrics
NewObserverWithMetrics returns an observer that delegates and updates Metrics.
func (*ObserverWithMetrics) OnFrameProcessed ¶
func (ob *ObserverWithMetrics) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed records frame and optionally latency; delegates to Observer.
type ObservingProcessor ¶
type ObservingProcessor struct {
Inner processors.Processor
Observer Observer
}
ObservingProcessor wraps a processor and notifies an observer for each frame (for metrics/logging).
func (*ObservingProcessor) Cleanup ¶
func (o *ObservingProcessor) Cleanup(ctx context.Context) error
func (*ObservingProcessor) Name ¶
func (o *ObservingProcessor) Name() string
func (*ObservingProcessor) ProcessFrame ¶
func (o *ObservingProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame forwards to the inner processor and notifies the observer. ORDERING: observer is notified after Inner has run; it receives the same frame reference (post-process state). Observers must not assume the frame is unmutated; if pre-process state is ever required, the contract would need to forbid in-place mutation or add a snapshot.
func (*ObservingProcessor) SetNext ¶
func (o *ObservingProcessor) SetNext(p processors.Processor)
func (*ObservingProcessor) SetPrev ¶
func (o *ObservingProcessor) SetPrev(p processors.Processor)
type TranscriptObserver ¶
type TranscriptObserver struct {
// contains filtered or unexported fields
}
TranscriptObserver records user and assistant messages for a session.
func NewTranscriptObserver ¶
func NewTranscriptObserver(store transcripts.Store, sessionID string) *TranscriptObserver
NewTranscriptObserver creates a new TranscriptObserver for a session.
func (*TranscriptObserver) OnFrameProcessed ¶
func (o *TranscriptObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed implements Observer.
type TurnMetrics ¶
TurnMetrics holds turn detection metrics.
type TurnTrackingObserver ¶
type TurnTrackingObserver struct {
// contains filtered or unexported fields
}
TurnTrackingObserver tracks conversation turns: turn start (StartFrame or user started), turn end (bot stopped + timeout or user started again or CancelFrame). Implements Observer; safe for concurrent use.
func NewTurnTrackingObserver ¶
func NewTurnTrackingObserver(opts ...TurnTrackingOption) *TurnTrackingObserver
NewTurnTrackingObserver returns a new TurnTrackingObserver.
func (*TurnTrackingObserver) OnFrameProcessed ¶
func (o *TurnTrackingObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed implements Observer.
type TurnTrackingOption ¶
type TurnTrackingOption func(*TurnTrackingObserver)
TurnTrackingOption configures TurnTrackingObserver.
func MaxFrames ¶
func MaxFrames(n int) TurnTrackingOption
MaxFrames sets the max frame IDs to keep for duplicate detection (default 100).
func OnTurnEnded ¶
func OnTurnEnded(f func(turnCount int, durationSecs float64, interrupted bool)) TurnTrackingOption
OnTurnEnded sets the callback when a turn ends (durationSecs, interrupted).
func OnTurnStarted ¶
func OnTurnStarted(f func(turnCount int)) TurnTrackingOption
OnTurnStarted sets the callback when a new turn starts.
func TurnEndTimeout ¶
func TurnEndTimeout(d time.Duration) TurnTrackingOption
TurnEndTimeout sets the timeout after bot stops speaking before ending the turn (default 2.5s).
type UserBotLatencyObserver ¶
type UserBotLatencyObserver struct {
// contains filtered or unexported fields
}
UserBotLatencyObserver measures time from user stopped speaking to bot started speaking. Implements Observer; only considers downstream frames. Safe for concurrent use.
func NewUserBotLatencyObserver ¶
func NewUserBotLatencyObserver(opts ...UserBotLatencyOption) *UserBotLatencyObserver
NewUserBotLatencyObserver returns a new UserBotLatencyObserver.
func (*UserBotLatencyObserver) OnFrameProcessed ¶
func (o *UserBotLatencyObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed implements Observer. Only downstream frames are considered.
type UserBotLatencyOption ¶
type UserBotLatencyOption func(*UserBotLatencyObserver)
UserBotLatencyOption configures UserBotLatencyObserver.
func OnLatencyMeasured ¶
func OnLatencyMeasured(f func(latencySecs float64)) UserBotLatencyOption
OnLatencyMeasured sets the callback when user-to-bot latency is measured.