Documentation
¶
Index ¶
- Constants
- Variables
- type BusError
- type ChatEvent
- type ChatEventSubscription
- type InboundMessage
- type Media
- type MessageBus
- func (b *MessageBus) Close() error
- func (b *MessageBus) ConsumeInbound(ctx context.Context) (*InboundMessage, error)
- func (b *MessageBus) ConsumeOutbound(ctx context.Context) (*OutboundMessage, error)
- func (b *MessageBus) InboundCount() int
- func (b *MessageBus) IsClosed() bool
- func (b *MessageBus) OutboundChan() <-chan *OutboundMessage
- func (b *MessageBus) OutboundCount() int
- func (b *MessageBus) PublishChatEvent(ctx context.Context, event *ChatEvent) error
- func (b *MessageBus) PublishInbound(ctx context.Context, msg *InboundMessage) error
- func (b *MessageBus) PublishOutbound(ctx context.Context, msg *OutboundMessage) error
- func (b *MessageBus) SubscribeChatEvent() *ChatEventSubscription
- func (b *MessageBus) SubscribeOutbound() *OutboundSubscription
- func (b *MessageBus) UnsubscribeChatEvent(subID string)
- func (b *MessageBus) UnsubscribeOutbound(subID string)
- type OutboundMessage
- type OutboundSubscription
- type StreamHandler
- func (h *StreamHandler) Close()
- func (h *StreamHandler) GetChunkIndex() int
- func (h *StreamHandler) GetContent() string
- func (h *StreamHandler) GetFinal() string
- func (h *StreamHandler) GetThinking() string
- func (h *StreamHandler) OnChunk(callback func(*StreamMessage)) *StreamHandler
- func (h *StreamHandler) OnComplete(callback func(string)) *StreamHandler
- func (h *StreamHandler) OnError(callback func(error)) *StreamHandler
- func (h *StreamHandler) Reset()
- func (h *StreamHandler) Start(ctx context.Context)
- type StreamMessage
- type StreamingMessageBus
- func (b *StreamingMessageBus) CloseStream(chatID string)
- func (b *StreamingMessageBus) CreateStream(chatID string) chan *StreamMessage
- func (b *StreamingMessageBus) GetStream(chatID string) (chan *StreamMessage, bool)
- func (b *StreamingMessageBus) PublishStream(ctx context.Context, msg *StreamMessage) error
- type SystemMessage
Constants ¶
const ( ChatEventStateDelta = "delta" // 增量文本 ChatEventStateThinking = "thinking" // 思考过程 ChatEventStateTool = "tool" // 工具调用 ChatEventStateFinal = "final" // 最终完成 ChatEventStateError = "error" // 错误 )
ChatEvent states
Variables ¶
var (
ErrBusClosed = &BusError{Message: "message bus is closed"}
)
Errors
Functions ¶
This section is empty.
Types ¶
type ChatEvent ¶ added in v0.4.0
type ChatEvent struct {
ID string `json:"id"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
RunID string `json:"run_id"`
Seq int `json:"seq"`
State string `json:"state"` // "delta", "thinking", "tool", "final", "error"
Content string `json:"content"` // 增量内容
Message string `json:"message"` // 完整消息(final 时)
Error string `json:"error"` // 错误信息
Timestamp time.Time `json:"timestamp"`
Metadata interface{} `json:"metadata,omitempty"`
}
ChatEvent 聊天事件(用于流式响应)
type ChatEventSubscription ¶ added in v0.4.0
type ChatEventSubscription struct {
ID string
Channel <-chan *ChatEvent
// contains filtered or unexported fields
}
ChatEventSubscription 聊天事件订阅
func (*ChatEventSubscription) Unsubscribe ¶ added in v0.4.0
func (s *ChatEventSubscription) Unsubscribe()
Unsubscribe 取消订阅
type InboundMessage ¶
type InboundMessage struct {
ID string `json:"id"`
Channel string `json:"channel"` // telegram, whatsapp, feishu, cli, system
AccountID string `json:"account_id"` // 账号ID(用于多账号场景)
SenderID string `json:"sender_id"` // 发送者ID
ChatID string `json:"chat_id"` // 聊天ID
Content string `json:"content"` // 消息内容
Media []Media `json:"media"` // 媒体文件
Metadata map[string]interface{} `json:"metadata"` // 元数据
Timestamp time.Time `json:"timestamp"`
}
InboundMessage 入站消息
func (*InboundMessage) IsSystemMessage ¶
func (m *InboundMessage) IsSystemMessage() bool
IsSystemMessage 判断是否为系统消息
type Media ¶
type Media struct {
Type string `json:"type"` // image, video, audio, document
URL string `json:"url"` // 文件URL
Base64 string `json:"base64"` // Base64编码内容
MimeType string `json:"mimetype"` // MIME类型
}
Media 媒体文件
type MessageBus ¶
type MessageBus struct {
// contains filtered or unexported fields
}
MessageBus 消息总线
func (*MessageBus) ConsumeInbound ¶
func (b *MessageBus) ConsumeInbound(ctx context.Context) (*InboundMessage, error)
ConsumeInbound 消费入站消息
func (*MessageBus) ConsumeOutbound ¶
func (b *MessageBus) ConsumeOutbound(ctx context.Context) (*OutboundMessage, error)
ConsumeOutbound 消费出站消息 使用订阅机制,确保消息能够被正确接收
func (*MessageBus) OutboundChan ¶
func (b *MessageBus) OutboundChan() <-chan *OutboundMessage
OutboundChan 获取出站消息通道(已废弃) 此方法已废弃,请使用 SubscribeOutbound 代替
func (*MessageBus) PublishChatEvent ¶ added in v0.4.0
func (b *MessageBus) PublishChatEvent(ctx context.Context, event *ChatEvent) error
PublishChatEvent 发布聊天事件
func (*MessageBus) PublishInbound ¶
func (b *MessageBus) PublishInbound(ctx context.Context, msg *InboundMessage) error
PublishInbound 发布入站消息
func (*MessageBus) PublishOutbound ¶
func (b *MessageBus) PublishOutbound(ctx context.Context, msg *OutboundMessage) error
PublishOutbound 发布出站消息
func (*MessageBus) SubscribeChatEvent ¶ added in v0.4.0
func (b *MessageBus) SubscribeChatEvent() *ChatEventSubscription
SubscribeChatEvent 订阅聊天事件
func (*MessageBus) SubscribeOutbound ¶
func (b *MessageBus) SubscribeOutbound() *OutboundSubscription
SubscribeOutbound 订阅出站消息(支持多个消费者) 使用内部订阅机制,每个订阅者有独立的 channel 返回一个 OutboundSubscription 对象,包含只读 channel 和取消订阅方法
func (*MessageBus) UnsubscribeChatEvent ¶ added in v0.4.0
func (b *MessageBus) UnsubscribeChatEvent(subID string)
UnsubscribeChatEvent 取消订阅聊天事件
func (*MessageBus) UnsubscribeOutbound ¶
func (b *MessageBus) UnsubscribeOutbound(subID string)
UnsubscribeOutbound 取消订阅出站消息
type OutboundMessage ¶
type OutboundMessage struct {
ID string `json:"id"`
Channel string `json:"channel"` // telegram, whatsapp, feishu, cli
ChatID string `json:"chat_id"` // 聊天ID
Content string `json:"content"` // 消息内容
Media []Media `json:"media"` // 媒体文件
ReplyTo string `json:"reply_to"` // 回复的消息ID
Metadata map[string]interface{} `json:"metadata"` // 元数据
Timestamp time.Time `json:"timestamp"`
}
OutboundMessage 出站消息
type OutboundSubscription ¶
type OutboundSubscription struct {
ID string
Channel <-chan *OutboundMessage
// contains filtered or unexported fields
}
OutboundSubscription 出站消息订阅
func (*OutboundSubscription) Unsubscribe ¶
func (s *OutboundSubscription) Unsubscribe()
Unsubscribe 取消订阅
type StreamHandler ¶
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler handles streaming messages
func NewStreamHandler ¶
func NewStreamHandler(bus *StreamingMessageBus, chatID string) *StreamHandler
NewStreamHandler creates a new stream handler
func (*StreamHandler) GetChunkIndex ¶
func (h *StreamHandler) GetChunkIndex() int
GetChunkIndex returns the current chunk index
func (*StreamHandler) GetContent ¶
func (h *StreamHandler) GetContent() string
GetContent returns the accumulated content
func (*StreamHandler) GetFinal ¶
func (h *StreamHandler) GetFinal() string
GetFinal returns the accumulated final content
func (*StreamHandler) GetThinking ¶
func (h *StreamHandler) GetThinking() string
GetThinking returns the accumulated thinking content
func (*StreamHandler) OnChunk ¶
func (h *StreamHandler) OnChunk(callback func(*StreamMessage)) *StreamHandler
OnChunk sets the chunk callback
func (*StreamHandler) OnComplete ¶
func (h *StreamHandler) OnComplete(callback func(string)) *StreamHandler
OnComplete sets the complete callback
func (*StreamHandler) OnError ¶
func (h *StreamHandler) OnError(callback func(error)) *StreamHandler
OnError sets the error callback
func (*StreamHandler) Start ¶
func (h *StreamHandler) Start(ctx context.Context)
Start starts handling streaming messages
type StreamMessage ¶
type StreamMessage struct {
ID string `json:"id"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
ChunkIndex int `json:"chunk_index"`
IsComplete bool `json:"is_complete"`
IsThinking bool `json:"is_thinking"`
IsFinal bool `json:"is_final"`
Metadata map[string]interface{} `json:"metadata"`
Error string `json:"error,omitempty"`
}
StreamMessage represents a streaming message update
type StreamingMessageBus ¶
type StreamingMessageBus struct {
*MessageBus
// contains filtered or unexported fields
}
StreamingMessageBus extends MessageBus with streaming support
func NewStreamingMessageBus ¶
func NewStreamingMessageBus(bufferSize int) *StreamingMessageBus
NewStreamingMessageBus creates a new streaming message bus
func (*StreamingMessageBus) CloseStream ¶
func (b *StreamingMessageBus) CloseStream(chatID string)
CloseStream closes a stream for a chat
func (*StreamingMessageBus) CreateStream ¶
func (b *StreamingMessageBus) CreateStream(chatID string) chan *StreamMessage
CreateStream creates a new stream for a chat
func (*StreamingMessageBus) GetStream ¶
func (b *StreamingMessageBus) GetStream(chatID string) (chan *StreamMessage, bool)
GetStream gets an existing stream for a chat
func (*StreamingMessageBus) PublishStream ¶
func (b *StreamingMessageBus) PublishStream(ctx context.Context, msg *StreamMessage) error
PublishStream publishes a streaming message
type SystemMessage ¶
type SystemMessage struct {
InboundMessage
TaskID string `json:"task_id"` // 任务ID
TaskLabel string `json:"task_label"` // 任务标签
Status string `json:"status"` // completed, failed
}
SystemMessage 系统消息(用于子代理结果通知)