observers

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package observers provides optional metrics (latency, token usage) and OpenTelemetry stub.

Package observers provides optional frame processing observers for metrics and logging.

Package observers provides turn tracking observer for conversation flow monitoring.

Package observers provides user-to-bot latency observer.

Index

Constants

This section is empty.

Variables

View Source
var OTELExport = func(metrics *Metrics) {

	_ = metrics
}

OpenTelemetry stub: no-op export. Replace with otel export when integrating.

Functions

func WrapWithObserver

func WrapWithObserver(p processors.Processor, observer Observer) processors.Processor

WrapWithObserver returns a processor that forwards to p then notifies observer (after inner has run).

Types

type CompositeObserver

type CompositeObserver struct {
	Observers []Observer
}

CompositeObserver forwards OnFrameProcessed to multiple observers.

func NewCompositeObserver

func NewCompositeObserver(observers ...Observer) *CompositeObserver

NewCompositeObserver returns an observer that delegates to all observers in order.

func (*CompositeObserver) OnFrameProcessed

func (c *CompositeObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed implements Observer.

type LLMTokenUsage

type LLMTokenUsage struct {
	PromptTokens             int
	CompletionTokens         int
	TotalTokens              int
	CacheReadInputTokens     int // optional
	CacheCreationInputTokens int // optional
	ReasoningTokens          int // optional
}

LLMTokenUsage holds token usage for an LLM call.

type Metrics

type Metrics struct {
	InputLatency  time.Duration // time from input to first output
	TTFBDuration  time.Duration // time to first byte (e.g. first LLM/TTS output)
	TokenCount    int64         // LLM tokens (approximate; legacy)
	CharCount     int64         // TTS characters
	FrameCount    int64         // frames processed
	LastProcessed time.Time
	// Structured usage (last recorded)
	LastLLMUsage    LLMTokenUsage
	LastTurnMetrics TurnMetrics
	// contains filtered or unexported fields
}

Metrics holds simple counters and latencies for pipeline/processors. THREAD SAFETY: mu guards all fields; safe for concurrent use from observer callbacks.

func NewMetrics

func NewMetrics() *Metrics

NewMetrics returns a new Metrics.

func (*Metrics) AddChars

func (m *Metrics) AddChars(n int64)

AddChars adds to the character count (TTS).

func (*Metrics) AddTokens

func (m *Metrics) AddTokens(n int64)

AddTokens adds to the token count (LLM).

func (*Metrics) IncFrames

func (m *Metrics) IncFrames()

IncFrames increments the frame count.

func (*Metrics) RecordLLMUsage

func (m *Metrics) RecordLLMUsage(u LLMTokenUsage)

RecordLLMUsage records LLM token usage (and updates legacy TokenCount).

func (*Metrics) RecordLatency

func (m *Metrics) RecordLatency(d time.Duration)

RecordLatency sets the input-to-output latency.

func (*Metrics) RecordTTFB

func (m *Metrics) RecordTTFB(d time.Duration)

RecordTTFB sets the time-to-first-byte duration.

func (*Metrics) RecordTurnMetrics

func (m *Metrics) RecordTurnMetrics(t TurnMetrics)

RecordTurnMetrics records turn detection metrics.

func (*Metrics) Snapshot

func (m *Metrics) Snapshot() (latency time.Duration, tokens, chars, frames int64, last time.Time)

Snapshot returns a copy of current metrics.

func (*Metrics) SnapshotFull

func (m *Metrics) SnapshotFull() (inputLatency, ttfb time.Duration, tokens, chars, frames int64, last time.Time, llm LLMTokenUsage, turn TurnMetrics)

SnapshotFull returns latency, TTFB, tokens, chars, frames, last time, last LLM usage, and last turn metrics.

type NoopObserver

type NoopObserver struct{}

NoopObserver does nothing.

func (NoopObserver) OnFrameProcessed

func (NoopObserver) OnFrameProcessed(string, frames.Frame, processors.Direction)

type Observer

type Observer interface {
	OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
}

Observer is notified when frames are processed or pushed.

type ObserverWithMetrics

type ObserverWithMetrics struct {
	Observer Observer
	Metrics  *Metrics
	// contains filtered or unexported fields
}

ObserverWithMetrics wraps an observer and updates metrics (latency, token/char count).

func NewObserverWithMetrics

func NewObserverWithMetrics(o Observer, m *Metrics) *ObserverWithMetrics

NewObserverWithMetrics returns an observer that delegates and updates Metrics.

func (*ObserverWithMetrics) OnFrameProcessed

func (ob *ObserverWithMetrics) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed records frame and optionally latency; delegates to Observer.

type ObservingProcessor

type ObservingProcessor struct {
	Inner    processors.Processor
	Observer Observer
}

ObservingProcessor wraps a processor and notifies an observer for each frame (for metrics/logging).

func (*ObservingProcessor) Cleanup

func (o *ObservingProcessor) Cleanup(ctx context.Context) error

func (*ObservingProcessor) Name

func (o *ObservingProcessor) Name() string

func (*ObservingProcessor) ProcessFrame

func (o *ObservingProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame forwards to the inner processor and notifies the observer. ORDERING: observer is notified after Inner has run; it receives the same frame reference (post-process state). Observers must not assume the frame is unmutated; if pre-process state is ever required, the contract would need to forbid in-place mutation or add a snapshot.

func (*ObservingProcessor) SetNext

func (o *ObservingProcessor) SetNext(p processors.Processor)

func (*ObservingProcessor) SetPrev

func (o *ObservingProcessor) SetPrev(p processors.Processor)

func (*ObservingProcessor) Setup

func (o *ObservingProcessor) Setup(ctx context.Context) error

type TranscriptObserver

type TranscriptObserver struct {
	// contains filtered or unexported fields
}

TranscriptObserver records user and assistant messages for a session.

func NewTranscriptObserver

func NewTranscriptObserver(store transcripts.Store, sessionID string) *TranscriptObserver

NewTranscriptObserver creates a new TranscriptObserver for a session.

func (*TranscriptObserver) OnFrameProcessed

func (o *TranscriptObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed implements Observer.

type TurnMetrics

type TurnMetrics struct {
	IsComplete          bool
	Probability         float64
	E2EProcessingTimeMs float64
}

TurnMetrics holds turn detection metrics.

type TurnTrackingObserver

type TurnTrackingObserver struct {
	// contains filtered or unexported fields
}

TurnTrackingObserver tracks conversation turns: turn start (StartFrame or user started), turn end (bot stopped + timeout or user started again or CancelFrame). Implements Observer; safe for concurrent use.

func NewTurnTrackingObserver

func NewTurnTrackingObserver(opts ...TurnTrackingOption) *TurnTrackingObserver

NewTurnTrackingObserver returns a new TurnTrackingObserver.

func (*TurnTrackingObserver) OnFrameProcessed

func (o *TurnTrackingObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed implements Observer.

type TurnTrackingOption

type TurnTrackingOption func(*TurnTrackingObserver)

TurnTrackingOption configures TurnTrackingObserver.

func MaxFrames

func MaxFrames(n int) TurnTrackingOption

MaxFrames sets the max frame IDs to keep for duplicate detection (default 100).

func OnTurnEnded

func OnTurnEnded(f func(turnCount int, durationSecs float64, interrupted bool)) TurnTrackingOption

OnTurnEnded sets the callback when a turn ends (durationSecs, interrupted).

func OnTurnStarted

func OnTurnStarted(f func(turnCount int)) TurnTrackingOption

OnTurnStarted sets the callback when a new turn starts.

func TurnEndTimeout

func TurnEndTimeout(d time.Duration) TurnTrackingOption

TurnEndTimeout sets the timeout after bot stops speaking before ending the turn (default 2.5s).

type UserBotLatencyObserver

type UserBotLatencyObserver struct {
	// contains filtered or unexported fields
}

UserBotLatencyObserver measures time from user stopped speaking to bot started speaking. Implements Observer; only considers downstream frames. Safe for concurrent use.

func NewUserBotLatencyObserver

func NewUserBotLatencyObserver(opts ...UserBotLatencyOption) *UserBotLatencyObserver

NewUserBotLatencyObserver returns a new UserBotLatencyObserver.

func (*UserBotLatencyObserver) OnFrameProcessed

func (o *UserBotLatencyObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed implements Observer. Only downstream frames are considered.

type UserBotLatencyOption

type UserBotLatencyOption func(*UserBotLatencyObserver)

UserBotLatencyOption configures UserBotLatencyObserver.

func OnLatencyMeasured

func OnLatencyMeasured(f func(latencySecs float64)) UserBotLatencyOption

OnLatencyMeasured sets the callback when user-to-bot latency is measured.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL