Documentation
¶
Index ¶
- Variables
- type AudioChunk
- type ContextUsage
- type ContextUsageStreamer
- type EventPublisher
- type InboundContext
- type InboundMessage
- type MediaPart
- type MessageBus
- func (mb *MessageBus) AudioChunksChan() <-chan AudioChunk
- func (mb *MessageBus) Close()
- func (mb *MessageBus) GetStreamer(ctx context.Context, channel, chatID, sessionKey string) (Streamer, bool)
- func (mb *MessageBus) HealthCheck() (bool, string)
- func (mb *MessageBus) InboundChan() <-chan InboundMessage
- func (mb *MessageBus) OutboundChan() <-chan OutboundMessage
- func (mb *MessageBus) OutboundMediaChan() <-chan OutboundMediaMessage
- func (mb *MessageBus) PublishAudioChunk(ctx context.Context, chunk AudioChunk) error
- func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) error
- func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage) error
- func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error
- func (mb *MessageBus) PublishVoiceControl(ctx context.Context, ctrl VoiceControl) error
- func (mb *MessageBus) SetEventPublisher(p EventPublisher)
- func (mb *MessageBus) SetStreamDelegate(d StreamDelegate)
- func (mb *MessageBus) Stats() MessageBusStats
- func (mb *MessageBus) VoiceControlsChan() <-chan VoiceControl
- type MessageBusStats
- type OutboundMediaMessage
- type OutboundMessage
- type OutboundScope
- type ReasoningStreamer
- type SenderInfo
- type StreamDelegate
- type StreamStats
- type Streamer
- type VoiceControl
Constants ¶
This section is empty.
Variables ¶
var ( ErrMissingInboundContext = errors.New("inbound message context is required") ErrMissingOutboundContext = errors.New("outbound message context is required") ErrMissingOutboundMediaContext = errors.New("outbound media context is required") )
var ErrBusBackpressure = errors.New("message bus backpressure")
ErrBusBackpressure is returned when a publish attempt exceeds the configured backpressure wait budget and the message is dropped.
var ErrBusClosed = errors.New("message bus closed")
ErrBusClosed is returned when publishing to a closed MessageBus.
Functions ¶
This section is empty.
Types ¶
type AudioChunk ¶ added in v0.2.5
type AudioChunk struct {
SessionID string `json:"session_id"`
SpeakerID string `json:"speaker_id"` // User ID or SSRC
ChatID string `json:"chat_id"` // Where to respond
Channel string `json:"channel"` // Source channel type (e.g. "discord")
Sequence uint64 `json:"sequence"`
Timestamp uint32 `json:"timestamp"`
SampleRate int `json:"sample_rate"`
Channels int `json:"channels"`
Format string `json:"format"` // "opus", "pcm", etc
Data []byte `json:"data"`
}
AudioChunk represents a chunk of streaming voice data.
type ContextUsage ¶ added in v0.2.7
type ContextUsage struct {
UsedTokens int `json:"used_tokens"`
TotalTokens int `json:"total_tokens"` // model context window
HistoryTokens int `json:"history_tokens"` // history-message tokens only (what maybeSummarize checks)
CompressAtTokens int `json:"compress_at_tokens"` // hard budget compression threshold (contextWindow - maxTokens)
SummarizeAtTokens int `json:"summarize_at_tokens"` // soft summarization trigger (vs history tokens)
UsedPercent int `json:"used_percent"` // 0-100, relative to compressAt
}
ContextUsage describes how much of the model's context window the current session consumes, and how far it is from triggering compression.
type ContextUsageStreamer ¶ added in v0.2.9
type ContextUsageStreamer interface {
Streamer
FinalizeWithContext(ctx context.Context, content string, usage *ContextUsage) error
}
ContextUsageStreamer can attach final context-window usage metadata when a streaming channel's final message replaces the normal outbound response.
type EventPublisher ¶ added in v0.2.9
type EventPublisher interface {
Publish(ctx context.Context, evt runtimeevents.Event) runtimeevents.PublishResult
PublishNonBlocking(evt runtimeevents.Event) runtimeevents.PublishResult
}
EventPublisher is the minimal runtime event publisher used by MessageBus.
type InboundContext ¶ added in v0.2.7
type InboundContext struct {
Channel string `json:"channel"`
Account string `json:"account,omitempty"`
ChatID string `json:"chat_id"`
ChatType string `json:"chat_type,omitempty"` // direct / group / channel
TopicID string `json:"topic_id,omitempty"`
SpaceID string `json:"space_id,omitempty"`
SpaceType string `json:"space_type,omitempty"` // guild / team / workspace / tenant
SenderID string `json:"sender_id"`
MessageID string `json:"message_id,omitempty"`
Mentioned bool `json:"mentioned,omitempty"`
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
ReplyToSenderID string `json:"reply_to_sender_id,omitempty"`
ReplyHandles map[string]string `json:"reply_handles,omitempty"`
Raw map[string]string `json:"raw,omitempty"`
}
InboundContext captures the normalized, platform-agnostic facts about an inbound message. This is the source of truth for routing and session allocation.
func NewOutboundContext ¶ added in v0.2.7
func NewOutboundContext(channel, chatID, replyToMessageID string) InboundContext
NewOutboundContext builds the minimal normalized addressing context required to deliver an outbound text message or reply.
type InboundMessage ¶
type InboundMessage struct {
Context InboundContext `json:"context"`
Sender SenderInfo `json:"sender"`
Content string `json:"content"`
Media []string `json:"media,omitempty"`
MediaScope string `json:"media_scope,omitempty"` // media lifecycle scope
SessionKey string `json:"session_key"`
// Convenience mirrors derived from Context for runtime consumers.
Channel string `json:"channel"`
SenderID string `json:"sender_id"`
ChatID string `json:"chat_id"`
MessageID string `json:"message_id,omitempty"` // platform message ID
}
func NormalizeInboundMessage ¶ added in v0.2.7
func NormalizeInboundMessage(msg InboundMessage) InboundMessage
NormalizeInboundMessage ensures the inbound context is normalized and keeps convenience mirrors in sync for runtime consumers.
type MediaPart ¶ added in v0.2.0
type MediaPart struct {
Type string `json:"type"` // "image" | "audio" | "video" | "file"
Ref string `json:"ref"` // media store ref, e.g. "media://abc123"
Caption string `json:"caption,omitempty"` // optional caption text
Filename string `json:"filename,omitempty"` // original filename hint
ContentType string `json:"content_type,omitempty"` // MIME type hint
}
MediaPart describes a single media attachment to send.
type MessageBus ¶
type MessageBus struct {
// contains filtered or unexported fields
}
func NewMessageBus ¶
func NewMessageBus() *MessageBus
func (*MessageBus) AudioChunksChan ¶ added in v0.2.5
func (mb *MessageBus) AudioChunksChan() <-chan AudioChunk
func (*MessageBus) Close ¶
func (mb *MessageBus) Close()
func (*MessageBus) GetStreamer ¶ added in v0.2.4
func (mb *MessageBus) GetStreamer(ctx context.Context, channel, chatID, sessionKey string) (Streamer, bool)
GetStreamer returns a Streamer for the given channel+chatID+session via the delegate.
func (*MessageBus) HealthCheck ¶ added in v0.3.0
func (mb *MessageBus) HealthCheck() (bool, string)
HealthCheck returns a snapshot of queue depths and cumulative drop counts across all streams. It always reports ok=true: backpressure-induced drops are reflected in the message string for telemetry but do not affect the boolean. Callers that need readiness semantics (e.g. returning 503 when drops occur) should inspect Stats() directly and apply their own threshold logic.
func (*MessageBus) InboundChan ¶ added in v0.2.4
func (mb *MessageBus) InboundChan() <-chan InboundMessage
func (*MessageBus) OutboundChan ¶ added in v0.2.4
func (mb *MessageBus) OutboundChan() <-chan OutboundMessage
func (*MessageBus) OutboundMediaChan ¶ added in v0.2.4
func (mb *MessageBus) OutboundMediaChan() <-chan OutboundMediaMessage
func (*MessageBus) PublishAudioChunk ¶ added in v0.2.5
func (mb *MessageBus) PublishAudioChunk(ctx context.Context, chunk AudioChunk) error
func (*MessageBus) PublishInbound ¶
func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) error
func (*MessageBus) PublishOutbound ¶
func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage) error
func (*MessageBus) PublishOutboundMedia ¶ added in v0.2.0
func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error
func (*MessageBus) PublishVoiceControl ¶ added in v0.2.5
func (mb *MessageBus) PublishVoiceControl(ctx context.Context, ctrl VoiceControl) error
func (*MessageBus) SetEventPublisher ¶ added in v0.2.9
func (mb *MessageBus) SetEventPublisher(p EventPublisher)
SetEventPublisher registers a runtime event publisher for bus errors and lifecycle events.
func (*MessageBus) SetStreamDelegate ¶ added in v0.2.4
func (mb *MessageBus) SetStreamDelegate(d StreamDelegate)
SetStreamDelegate registers a StreamDelegate (typically the channel Manager).
func (*MessageBus) Stats ¶ added in v0.3.0
func (mb *MessageBus) Stats() MessageBusStats
func (*MessageBus) VoiceControlsChan ¶ added in v0.2.5
func (mb *MessageBus) VoiceControlsChan() <-chan VoiceControl
type MessageBusStats ¶ added in v0.3.0
type MessageBusStats struct {
Inbound StreamStats `json:"inbound"`
Outbound StreamStats `json:"outbound"`
OutboundMedia StreamStats `json:"outbound_media"`
AudioChunks StreamStats `json:"audio_chunks"`
VoiceControls StreamStats `json:"voice_controls"`
}
type OutboundMediaMessage ¶ added in v0.2.0
type OutboundMediaMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Context InboundContext `json:"context"`
AgentID string `json:"agent_id,omitempty"`
SessionKey string `json:"session_key,omitempty"`
Scope *OutboundScope `json:"scope,omitempty"`
Parts []MediaPart `json:"parts"`
}
OutboundMediaMessage carries media attachments from Agent to channels via the bus.
func NormalizeOutboundMediaMessage ¶ added in v0.2.7
func NormalizeOutboundMediaMessage(msg OutboundMediaMessage) OutboundMediaMessage
NormalizeOutboundMediaMessage ensures media outbound messages also carry a normalized context while keeping convenience mirrors in sync.
type OutboundMessage ¶
type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Context InboundContext `json:"context"`
AgentID string `json:"agent_id,omitempty"`
SessionKey string `json:"session_key,omitempty"`
Scope *OutboundScope `json:"scope,omitempty"`
Content string `json:"content"`
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
ContextUsage *ContextUsage `json:"context_usage,omitempty"`
}
func NormalizeOutboundMessage ¶ added in v0.2.7
func NormalizeOutboundMessage(msg OutboundMessage) OutboundMessage
NormalizeOutboundMessage ensures Context is normalized and keeps convenience mirrors in sync for runtime consumers.
type OutboundScope ¶ added in v0.2.7
type OutboundScope struct {
Version int `json:"version,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Channel string `json:"channel,omitempty"`
Account string `json:"account,omitempty"`
Dimensions []string `json:"dimensions,omitempty"`
Values map[string]string `json:"values,omitempty"`
}
OutboundScope captures the structured session scope associated with an outbound turn result without depending on the session package.
type ReasoningStreamer ¶ added in v0.2.9
type ReasoningStreamer interface {
UpdateReasoning(ctx context.Context, content string) error
FinalizeReasoning(ctx context.Context, content string) error
}
ReasoningStreamer can show incremental model reasoning/thought content separately from the final user-visible answer stream.
type SenderInfo ¶ added in v0.2.0
type SenderInfo struct {
Platform string `json:"platform,omitempty"` // "telegram", "discord", "slack", ...
PlatformID string `json:"platform_id,omitempty"` // raw platform ID, e.g. "123456"
CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" format
Username string `json:"username,omitempty"` // username (e.g. @alice)
DisplayName string `json:"display_name,omitempty"` // display name
}
SenderInfo provides structured sender identity information.
type StreamDelegate ¶ added in v0.2.4
type StreamDelegate interface {
// GetStreamer returns a Streamer for the given channel+chatID if the channel
// supports streaming. Returns nil, false if streaming is unavailable.
GetStreamer(ctx context.Context, channel, chatID, sessionKey string) (Streamer, bool)
}
StreamDelegate is implemented by the channel Manager to provide streaming capabilities to the agent loop without tight coupling.
type StreamStats ¶ added in v0.3.0
type StreamStats struct {
Depth int `json:"depth"`
Capacity int `json:"capacity"`
DroppedTotal uint64 `json:"dropped_total"`
LastDroppedAt time.Time `json:"last_dropped_at,omitempty"`
LastDropWait string `json:"last_drop_wait,omitempty"`
LastDropWaitMillis int64 `json:"last_drop_wait_ms,omitempty"`
}
type Streamer ¶ added in v0.2.4
type Streamer interface {
Update(ctx context.Context, content string) error
Finalize(ctx context.Context, content string) error
Cancel(ctx context.Context)
}
Streamer pushes incremental content to a streaming-capable channel. Defined here so the agent loop can use it without importing pkg/channels.
type VoiceControl ¶ added in v0.2.5
type VoiceControl struct {
SessionID string `json:"session_id"`
ChatID string `json:"chat_id"`
Type string `json:"type"` // "state", "command"
Action string `json:"action"` // "idle", "listening", "start", "stop", "leave"
}
VoiceControl represents state or commands for voice sessions.