Documentation
¶
Index ¶
- Variables
- type BusError
- type InboundMessage
- type Media
- type MessageBus
- func (b *MessageBus) Close() error
- func (b *MessageBus) ConsumeInbound(ctx context.Context) (*InboundMessage, error)
- func (b *MessageBus) ConsumeOutbound(ctx context.Context) (*OutboundMessage, error)
- func (b *MessageBus) InboundCount() int
- func (b *MessageBus) IsClosed() bool
- func (b *MessageBus) OutboundChan() <-chan *OutboundMessage
- func (b *MessageBus) OutboundCount() int
- func (b *MessageBus) PublishInbound(ctx context.Context, msg *InboundMessage) error
- func (b *MessageBus) PublishOutbound(ctx context.Context, msg *OutboundMessage) error
- func (b *MessageBus) SubscribeOutbound() *OutboundSubscription
- func (b *MessageBus) UnsubscribeOutbound(subID string)
- type OutboundMessage
- type OutboundSubscription
- type StreamHandler
- func (h *StreamHandler) Close()
- func (h *StreamHandler) GetChunkIndex() int
- func (h *StreamHandler) GetContent() string
- func (h *StreamHandler) GetFinal() string
- func (h *StreamHandler) GetThinking() string
- func (h *StreamHandler) OnChunk(callback func(*StreamMessage)) *StreamHandler
- func (h *StreamHandler) OnComplete(callback func(string)) *StreamHandler
- func (h *StreamHandler) OnError(callback func(error)) *StreamHandler
- func (h *StreamHandler) Reset()
- func (h *StreamHandler) Start(ctx context.Context)
- type StreamMessage
- type StreamingMessageBus
- func (b *StreamingMessageBus) CloseStream(chatID string)
- func (b *StreamingMessageBus) CreateStream(chatID string) chan *StreamMessage
- func (b *StreamingMessageBus) GetStream(chatID string) (chan *StreamMessage, bool)
- func (b *StreamingMessageBus) PublishStream(ctx context.Context, msg *StreamMessage) error
- type SystemMessage
Constants ¶
This section is empty.
Variables ¶
var (
ErrBusClosed = &BusError{Message: "message bus is closed"}
)
Errors
Functions ¶
This section is empty.
Types ¶
type InboundMessage ¶
type InboundMessage struct {
ID string `json:"id"`
Channel string `json:"channel"` // telegram, whatsapp, feishu, cli, system
AccountID string `json:"account_id"` // 账号ID(用于多账号场景)
SenderID string `json:"sender_id"` // 发送者ID
ChatID string `json:"chat_id"` // 聊天ID
Content string `json:"content"` // 消息内容
Media []Media `json:"media"` // 媒体文件
Metadata map[string]interface{} `json:"metadata"` // 元数据
Timestamp time.Time `json:"timestamp"`
}
InboundMessage 入站消息
func (*InboundMessage) IsSystemMessage ¶
func (m *InboundMessage) IsSystemMessage() bool
IsSystemMessage 判断是否为系统消息
type Media ¶
type Media struct {
Type string `json:"type"` // image, video, audio, document
URL string `json:"url"` // 文件URL
Base64 string `json:"base64"` // Base64编码内容
MimeType string `json:"mimetype"` // MIME类型
}
Media 媒体文件
type MessageBus ¶
type MessageBus struct {
// contains filtered or unexported fields
}
MessageBus 消息总线
func (*MessageBus) ConsumeInbound ¶
func (b *MessageBus) ConsumeInbound(ctx context.Context) (*InboundMessage, error)
ConsumeInbound 消费入站消息
func (*MessageBus) ConsumeOutbound ¶
func (b *MessageBus) ConsumeOutbound(ctx context.Context) (*OutboundMessage, error)
ConsumeOutbound 消费出站消息 使用订阅机制,确保消息能够被正确接收
func (*MessageBus) OutboundChan ¶
func (b *MessageBus) OutboundChan() <-chan *OutboundMessage
OutboundChan 获取出站消息通道(已废弃) 此方法已废弃,请使用 SubscribeOutbound 代替
func (*MessageBus) PublishInbound ¶
func (b *MessageBus) PublishInbound(ctx context.Context, msg *InboundMessage) error
PublishInbound 发布入站消息
func (*MessageBus) PublishOutbound ¶
func (b *MessageBus) PublishOutbound(ctx context.Context, msg *OutboundMessage) error
PublishOutbound 发布出站消息
func (*MessageBus) SubscribeOutbound ¶
func (b *MessageBus) SubscribeOutbound() *OutboundSubscription
SubscribeOutbound 订阅出站消息(支持多个消费者) 使用内部订阅机制,每个订阅者有独立的 channel 返回一个 OutboundSubscription 对象,包含只读 channel 和取消订阅方法
func (*MessageBus) UnsubscribeOutbound ¶
func (b *MessageBus) UnsubscribeOutbound(subID string)
UnsubscribeOutbound 取消订阅出站消息
type OutboundMessage ¶
type OutboundMessage struct {
ID string `json:"id"`
Channel string `json:"channel"` // telegram, whatsapp, feishu, cli
ChatID string `json:"chat_id"` // 聊天ID
Content string `json:"content"` // 消息内容
Media []Media `json:"media"` // 媒体文件
ReplyTo string `json:"reply_to"` // 回复的消息ID
Metadata map[string]interface{} `json:"metadata"` // 元数据
Timestamp time.Time `json:"timestamp"`
}
OutboundMessage 出站消息
type OutboundSubscription ¶
type OutboundSubscription struct {
ID string
Channel <-chan *OutboundMessage
// contains filtered or unexported fields
}
OutboundSubscription 出站消息订阅
func (*OutboundSubscription) Unsubscribe ¶
func (s *OutboundSubscription) Unsubscribe()
Unsubscribe 取消订阅
type StreamHandler ¶
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler handles streaming messages
func NewStreamHandler ¶
func NewStreamHandler(bus *StreamingMessageBus, chatID string) *StreamHandler
NewStreamHandler creates a new stream handler
func (*StreamHandler) GetChunkIndex ¶
func (h *StreamHandler) GetChunkIndex() int
GetChunkIndex returns the current chunk index
func (*StreamHandler) GetContent ¶
func (h *StreamHandler) GetContent() string
GetContent returns the accumulated content
func (*StreamHandler) GetFinal ¶
func (h *StreamHandler) GetFinal() string
GetFinal returns the accumulated final content
func (*StreamHandler) GetThinking ¶
func (h *StreamHandler) GetThinking() string
GetThinking returns the accumulated thinking content
func (*StreamHandler) OnChunk ¶
func (h *StreamHandler) OnChunk(callback func(*StreamMessage)) *StreamHandler
OnChunk sets the chunk callback
func (*StreamHandler) OnComplete ¶
func (h *StreamHandler) OnComplete(callback func(string)) *StreamHandler
OnComplete sets the complete callback
func (*StreamHandler) OnError ¶
func (h *StreamHandler) OnError(callback func(error)) *StreamHandler
OnError sets the error callback
func (*StreamHandler) Start ¶
func (h *StreamHandler) Start(ctx context.Context)
Start starts handling streaming messages
type StreamMessage ¶
type StreamMessage struct {
ID string `json:"id"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
ChunkIndex int `json:"chunk_index"`
IsComplete bool `json:"is_complete"`
IsThinking bool `json:"is_thinking"`
IsFinal bool `json:"is_final"`
Metadata map[string]interface{} `json:"metadata"`
Error string `json:"error,omitempty"`
}
StreamMessage represents a streaming message update
type StreamingMessageBus ¶
type StreamingMessageBus struct {
*MessageBus
// contains filtered or unexported fields
}
StreamingMessageBus extends MessageBus with streaming support
func NewStreamingMessageBus ¶
func NewStreamingMessageBus(bufferSize int) *StreamingMessageBus
NewStreamingMessageBus creates a new streaming message bus
func (*StreamingMessageBus) CloseStream ¶
func (b *StreamingMessageBus) CloseStream(chatID string)
CloseStream closes a stream for a chat
func (*StreamingMessageBus) CreateStream ¶
func (b *StreamingMessageBus) CreateStream(chatID string) chan *StreamMessage
CreateStream creates a new stream for a chat
func (*StreamingMessageBus) GetStream ¶
func (b *StreamingMessageBus) GetStream(chatID string) (chan *StreamMessage, bool)
GetStream gets an existing stream for a chat
func (*StreamingMessageBus) PublishStream ¶
func (b *StreamingMessageBus) PublishStream(ctx context.Context, msg *StreamMessage) error
PublishStream publishes a streaming message
type SystemMessage ¶
type SystemMessage struct {
InboundMessage
TaskID string `json:"task_id"` // 任务ID
TaskLabel string `json:"task_label"` // 任务标签
Status string `json:"status"` // completed, failed
}
SystemMessage 系统消息(用于子代理结果通知)