bus

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBusClosed = &BusError{Message: "message bus is closed"}
)

Errors

Functions

This section is empty.

Types

type BusError

type BusError struct {
	Message string
}

BusError 总线错误

func (*BusError) Error

func (e *BusError) Error() string

type InboundMessage

type InboundMessage struct {
	ID        string                 `json:"id"`
	Channel   string                 `json:"channel"`    // telegram, whatsapp, feishu, cli, system
	AccountID string                 `json:"account_id"` // 账号ID(用于多账号场景)
	SenderID  string                 `json:"sender_id"`  // 发送者ID
	ChatID    string                 `json:"chat_id"`    // 聊天ID
	Content   string                 `json:"content"`    // 消息内容
	Media     []Media                `json:"media"`      // 媒体文件
	Metadata  map[string]interface{} `json:"metadata"`   // 元数据
	Timestamp time.Time              `json:"timestamp"`
}

InboundMessage 入站消息

func (*InboundMessage) IsSystemMessage

func (m *InboundMessage) IsSystemMessage() bool

IsSystemMessage 判断是否为系统消息

func (*InboundMessage) SessionKey

func (m *InboundMessage) SessionKey() string

SessionKey 返回会话键

type Media

type Media struct {
	Type     string `json:"type"`     // image, video, audio, document
	URL      string `json:"url"`      // 文件URL
	Base64   string `json:"base64"`   // Base64编码内容
	MimeType string `json:"mimetype"` // MIME类型
}

Media 媒体文件

type MessageBus

type MessageBus struct {
	// contains filtered or unexported fields
}

MessageBus 消息总线

func NewMessageBus

func NewMessageBus(bufferSize int) *MessageBus

NewMessageBus 创建消息总线

func (*MessageBus) Close

func (b *MessageBus) Close() error

Close 关闭消息总线

func (*MessageBus) ConsumeInbound

func (b *MessageBus) ConsumeInbound(ctx context.Context) (*InboundMessage, error)

ConsumeInbound 消费入站消息

func (*MessageBus) ConsumeOutbound

func (b *MessageBus) ConsumeOutbound(ctx context.Context) (*OutboundMessage, error)

ConsumeOutbound 消费出站消息 使用订阅机制,确保消息能够被正确接收

func (*MessageBus) InboundCount

func (b *MessageBus) InboundCount() int

InboundCount 获取入站消息数量

func (*MessageBus) IsClosed

func (b *MessageBus) IsClosed() bool

IsClosed 检查是否已关闭

func (*MessageBus) OutboundChan

func (b *MessageBus) OutboundChan() <-chan *OutboundMessage

OutboundChan 获取出站消息通道(已废弃) 此方法已废弃,请使用 SubscribeOutbound 代替

func (*MessageBus) OutboundCount

func (b *MessageBus) OutboundCount() int

OutboundCount 获取出站消息数量

func (*MessageBus) PublishInbound

func (b *MessageBus) PublishInbound(ctx context.Context, msg *InboundMessage) error

PublishInbound 发布入站消息

func (*MessageBus) PublishOutbound

func (b *MessageBus) PublishOutbound(ctx context.Context, msg *OutboundMessage) error

PublishOutbound 发布出站消息

func (*MessageBus) SubscribeOutbound

func (b *MessageBus) SubscribeOutbound() *OutboundSubscription

SubscribeOutbound 订阅出站消息(支持多个消费者) 使用内部订阅机制,每个订阅者有独立的 channel 返回一个 OutboundSubscription 对象,包含只读 channel 和取消订阅方法

func (*MessageBus) UnsubscribeOutbound

func (b *MessageBus) UnsubscribeOutbound(subID string)

UnsubscribeOutbound 取消订阅出站消息

type OutboundMessage

type OutboundMessage struct {
	ID        string                 `json:"id"`
	Channel   string                 `json:"channel"`  // telegram, whatsapp, feishu, cli
	ChatID    string                 `json:"chat_id"`  // 聊天ID
	Content   string                 `json:"content"`  // 消息内容
	Media     []Media                `json:"media"`    // 媒体文件
	ReplyTo   string                 `json:"reply_to"` // 回复的消息ID
	Metadata  map[string]interface{} `json:"metadata"` // 元数据
	Timestamp time.Time              `json:"timestamp"`
}

OutboundMessage 出站消息

type OutboundSubscription

type OutboundSubscription struct {
	ID      string
	Channel <-chan *OutboundMessage
	// contains filtered or unexported fields
}

OutboundSubscription 出站消息订阅

func (*OutboundSubscription) Unsubscribe

func (s *OutboundSubscription) Unsubscribe()

Unsubscribe 取消订阅

type StreamHandler

type StreamHandler struct {
	// contains filtered or unexported fields
}

StreamHandler handles streaming messages

func NewStreamHandler

func NewStreamHandler(bus *StreamingMessageBus, chatID string) *StreamHandler

NewStreamHandler creates a new stream handler

func (*StreamHandler) Close

func (h *StreamHandler) Close()

Close closes the stream handler

func (*StreamHandler) GetChunkIndex

func (h *StreamHandler) GetChunkIndex() int

GetChunkIndex returns the current chunk index

func (*StreamHandler) GetContent

func (h *StreamHandler) GetContent() string

GetContent returns the accumulated content

func (*StreamHandler) GetFinal

func (h *StreamHandler) GetFinal() string

GetFinal returns the accumulated final content

func (*StreamHandler) GetThinking

func (h *StreamHandler) GetThinking() string

GetThinking returns the accumulated thinking content

func (*StreamHandler) OnChunk

func (h *StreamHandler) OnChunk(callback func(*StreamMessage)) *StreamHandler

OnChunk sets the chunk callback

func (*StreamHandler) OnComplete

func (h *StreamHandler) OnComplete(callback func(string)) *StreamHandler

OnComplete sets the complete callback

func (*StreamHandler) OnError

func (h *StreamHandler) OnError(callback func(error)) *StreamHandler

OnError sets the error callback

func (*StreamHandler) Reset

func (h *StreamHandler) Reset()

Reset resets the handler state

func (*StreamHandler) Start

func (h *StreamHandler) Start(ctx context.Context)

Start starts handling streaming messages

type StreamMessage

type StreamMessage struct {
	ID         string                 `json:"id"`
	Channel    string                 `json:"channel"`
	ChatID     string                 `json:"chat_id"`
	Content    string                 `json:"content"`
	ChunkIndex int                    `json:"chunk_index"`
	IsComplete bool                   `json:"is_complete"`
	IsThinking bool                   `json:"is_thinking"`
	IsFinal    bool                   `json:"is_final"`
	Metadata   map[string]interface{} `json:"metadata"`
	Error      string                 `json:"error,omitempty"`
}

StreamMessage represents a streaming message update

type StreamingMessageBus

type StreamingMessageBus struct {
	*MessageBus
	// contains filtered or unexported fields
}

StreamingMessageBus extends MessageBus with streaming support

func NewStreamingMessageBus

func NewStreamingMessageBus(bufferSize int) *StreamingMessageBus

NewStreamingMessageBus creates a new streaming message bus

func (*StreamingMessageBus) CloseStream

func (b *StreamingMessageBus) CloseStream(chatID string)

CloseStream closes a stream for a chat

func (*StreamingMessageBus) CreateStream

func (b *StreamingMessageBus) CreateStream(chatID string) chan *StreamMessage

CreateStream creates a new stream for a chat

func (*StreamingMessageBus) GetStream

func (b *StreamingMessageBus) GetStream(chatID string) (chan *StreamMessage, bool)

GetStream gets an existing stream for a chat

func (*StreamingMessageBus) PublishStream

func (b *StreamingMessageBus) PublishStream(ctx context.Context, msg *StreamMessage) error

PublishStream publishes a streaming message

type SystemMessage

type SystemMessage struct {
	InboundMessage
	TaskID    string `json:"task_id"`    // 任务ID
	TaskLabel string `json:"task_label"` // 任务标签
	Status    string `json:"status"`     // completed, failed
}

SystemMessage 系统消息(用于子代理结果通知)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL