Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateInteractionID(sessionID string, timestamp time.Time) string
- func GetExtraChunks(msg *base.ChatMessage) []string
- func IsEnabled(configDir string) bool
- func IsRetryableError(err error) bool
- func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error
- type AdapterManager
- func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)
- func (m *AdapterManager) GetMessageOperations(platform string) MessageOperations
- func (m *AdapterManager) GetSessionOperations(platform string) SessionOperations
- func (m *AdapterManager) GetStatusProvider(platform string) base.StatusProvider
- func (m *AdapterManager) Handler() http.Handler
- func (m *AdapterManager) ListPlatforms() []string
- func (m *AdapterManager) NewStreamWriter(ctx context.Context, platform, channelID, threadTS string) base.StreamWriter
- func (m *AdapterManager) Register(adapter ChatAdapter) error
- func (m *AdapterManager) RegisterEngine(eng *engine.Engine)
- func (m *AdapterManager) RegisterRoutes(router *mux.Router)
- func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error
- func (m *AdapterManager) StartAll(ctx context.Context) error
- func (m *AdapterManager) StopAll() error
- func (m *AdapterManager) Unregister(platform string) error
- type AdapterMetrics
- type Attachment
- type ChatAdapter
- type ChatMessage
- type ChunkInfo
- type ChunkProcessor
- type ChunkProcessorOptions
- type CommandCallback
- type ConfigLoader
- func (c *ConfigLoader) Close() error
- func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig
- func (c *ConfigLoader) GetOptions(platform string) map[string]any
- func (c *ConfigLoader) GetSystemPrompt(platform string) string
- func (c *ConfigLoader) GetTaskInstructions(platform string) string
- func (c *ConfigLoader) HasPlatform(platform string) bool
- func (c *ConfigLoader) Load(configDir string) error
- func (c *ConfigLoader) Platforms() []string
- func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, ...) error
- type DingTalkConfig
- type DiscordEmbed
- type DiscordEmbedField
- type DiscordEmbedFooter
- type DiscordEmbedImage
- type DiscordEmbedThumbnail
- type Engine
- type EngineConfig
- type EngineHolder
- type EngineHolderOptions
- type EngineMessageHandler
- type EngineMessageHandlerOption
- type FormatConversionProcessor
- type HealthCheck
- type HealthChecker
- type InlineKeyboardButton
- type InlineKeyboardMarkup
- type InteractionCallback
- type InteractionManager
- func (m *InteractionManager) Complete(id string, response *InteractionResponse) error
- func (m *InteractionManager) Count() int
- func (m *InteractionManager) Delete(id string)
- func (m *InteractionManager) Expire(id string) error
- func (m *InteractionManager) Get(id string) (*PendingInteraction, bool)
- func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction
- func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error
- func (m *InteractionManager) PendingCount() int
- func (m *InteractionManager) Stop()
- func (m *InteractionManager) Store(interaction *PendingInteraction) string
- func (m *InteractionManager) TotalCount() int
- type InteractionManagerOptions
- type InteractionResponse
- type InteractionStatus
- type InteractionType
- type LifecycleManager
- func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)
- func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)
- func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)
- func (m *LifecycleManager) StartAll(ctx context.Context) error
- func (m *LifecycleManager) StopAll() error
- type Logger
- type MessageFilterProcessor
- type MessageHandler
- type MessageOperations
- type MessageProcessor
- type MessageQueue
- func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)
- func (q *MessageQueue) DLQLen() int
- func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)
- func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error
- func (q *MessageQueue) GetDLQ() []*QueuedMessage
- func (q *MessageQueue) Requeue(msg *QueuedMessage) error
- func (q *MessageQueue) Size() int
- func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), ...)
- func (q *MessageQueue) Stop()
- type ParseMode
- type PendingInteraction
- type PermissionConfig
- type PlatformConfig
- type ProcessorChain
- func (c *ProcessorChain) AddProcessor(processor MessageProcessor)
- func (c *ProcessorChain) Close()
- func (c *ProcessorChain) Name() string
- func (c *ProcessorChain) Order() int
- func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
- func (c *ProcessorChain) ResetSession(platform, sessionID string)
- type ProcessorOrder
- type QueueError
- type QueuedMessage
- type RateLimiter
- type RetryConfig
- type RichContent
- type SecurityConfig
- type Session
- type SessionOperations
- type SessionStats
- type SlackBlock
- type StreamAdapter
- type StreamCallback
- type StreamHandler
- type ThreadInfo
- type ThreadProcessor
- func (p *ThreadProcessor) Delete(sessionID string)
- func (p *ThreadProcessor) GetThreadTS(sessionID string) string
- func (p *ThreadProcessor) Name() string
- func (p *ThreadProcessor) Order() int
- func (p *ThreadProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
- func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)
- func (p *ThreadProcessor) Stop()
- type ThreadProcessorOptions
- type WhatsAppConfig
Constants ¶
const ( StatusThinkingLabel = "🧠 深度推演规划中..." StatusToolExecutingLabel = "🛠️ 正在执行工具 [%s]..." StatusResultParsingLabel = "🧠 正在解析执行结果..." StatusErrorLabel = "❌ 执行过程中发生错误" StatusDangerBlockLabel = "⚠️ 拦截到高危操作,等待确认..." StatusProgressLabel = "⏳ 正在执行后台任务..." StatusProgressDoneLabel = "✅ 后台任务执行完毕" StatusStepStartLabel = "🔍 正在分析执行轨迹..." StatusStepFinishLabel = "✅ 当前任务阶段构建完成" StatusPlanModeLabel = "📝 正在制定作战计划..." StatusExitPlanModeLabel = "📝 作战计划就绪,等待您的批准..." StatusAskUserLabel = "⏳ 等待您提供更多信息..." StatusEngineStartingLabel = "🚀 正在唤醒推演引擎..." StatusPermissionLabel = "🛡️ 拦截到高危操作,等待提权审批..." // Contextual labels StatusToolResultThinkingLabel = "🧠 正在解析执行结果 (耗时: %dms)..." StatusSessionStartColdLabel = "🚀 正在初始化上下文..." StatusSessionStartHotLabel = "🚀 重新连接并恢复上下文..." StatusAnswerLabel = "✍️ 正在生成回答..." )
UI Status indicator labels for AI Native experience
const ( ParseModeNone = base.ParseModeNone ParseModeMarkdown = base.ParseModeMarkdown ParseModeHTML = base.ParseModeHTML )
const (
// Metadata keys
MetadataKeyTransient = "is_transient"
)
Variables ¶
var ( // MessagesAggregatedTotal counts total messages aggregated MessagesAggregatedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_aggregated_total", Help: "Total number of messages that have been aggregated", }, []string{"event_type", "platform"}, ) // MessagesFlushedTotal counts total flushes by reason MessagesFlushedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_flushed_total", Help: "Total number of times the buffer was flushed", }, []string{"event_type", "platform", "reason"}, ) // MessagesDroppedTotal counts dropped messages MessagesDroppedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_dropped_total", Help: "Total number of messages dropped due to buffer overflow", }, []string{"event_type", "platform", "reason"}, ) // BufferSizeGauge tracks current buffer size per platform BufferSizeGauge = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "hotplex_aggregator_buffer_size", Help: "Current number of messages in the buffer per platform", }, []string{"platform"}, ) // BufferDurationHistogram tracks time from first message to flush BufferDurationHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "hotplex_aggregator_buffer_duration_seconds", Help: "Time in seconds from first message arrival to buffer flush", Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0}, }, []string{"platform"}, ) // MessageSizeHistogram tracks message size distribution MessageSizeHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "hotplex_aggregator_message_size_bytes", Help: "Size distribution of aggregated messages in bytes", Buckets: []float64{64, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}, }, []string{"event_type", "platform"}, ) )
var ErrQueueFull = &QueueError{Message: "queue is full"}
Functions ¶
func GenerateInteractionID ¶ added in v0.12.0
GenerateInteractionID generates a unique interaction ID.
func GetExtraChunks ¶ added in v0.12.0
func GetExtraChunks(msg *base.ChatMessage) []string
GetExtraChunks returns any extra chunks stored in message metadata. Returns nil if no extra chunks exist.
func IsEnabled ¶ added in v0.20.0
IsEnabled returns true if ChatApps should be activated based on environment variables or flags. It returns true if any of the following is true: 1. HOTPLEX_CHATAPPS_ENABLED environment variable is "true" 2. configDir parameter is not empty (explicitly set via --config flag) 3. HOTPLEX_CHATAPPS_CONFIG_DIR environment variable is not empty
func IsRetryableError ¶
IsRetryableError classifies errors as retryable or non-retryable
func RetryWithBackoff ¶
func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error
Types ¶
type AdapterManager ¶
type AdapterManager struct {
// contains filtered or unexported fields
}
func NewAdapterManager ¶
func NewAdapterManager(logger *slog.Logger) *AdapterManager
func Setup ¶ added in v0.11.0
func Setup(ctx context.Context, logger *slog.Logger, configDir ...string) (http.Handler, *AdapterManager, error)
Setup initializes all enabled ChatApps and their dedicated Engines. It returns an http.Handler that handles all webhook routes. The configDir parameter takes priority over HOTPLEX_CHATAPPS_CONFIG_DIR environment variable.
func (*AdapterManager) GetAdapter ¶
func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)
func (*AdapterManager) GetMessageOperations ¶ added in v0.15.5
func (m *AdapterManager) GetMessageOperations(platform string) MessageOperations
GetMessageOperations returns platform-specific message operations interface Returns nil if the platform doesn't support message operations
func (*AdapterManager) GetSessionOperations ¶ added in v0.15.5
func (m *AdapterManager) GetSessionOperations(platform string) SessionOperations
GetSessionOperations returns platform-specific session operations interface Returns nil if the platform doesn't support session operations
func (*AdapterManager) GetStatusProvider ¶ added in v0.18.0
func (m *AdapterManager) GetStatusProvider(platform string) base.StatusProvider
GetStatusProvider returns platform-specific status provider interface Returns nil if the platform doesn't support status operations
func (*AdapterManager) Handler ¶ added in v0.11.0
func (m *AdapterManager) Handler() http.Handler
Handler returns an http.Handler with all adapter webhooks mounted This is a convenience method when you don't need gorilla/mux
func (*AdapterManager) ListPlatforms ¶
func (m *AdapterManager) ListPlatforms() []string
func (*AdapterManager) NewStreamWriter ¶ added in v0.18.0
func (m *AdapterManager) NewStreamWriter(ctx context.Context, platform, channelID, threadTS string) base.StreamWriter
NewStreamWriter creates a platform-agnostic streaming writer for the given platform Returns nil if the platform doesn't support streaming or adapter not found
func (*AdapterManager) Register ¶
func (m *AdapterManager) Register(adapter ChatAdapter) error
func (*AdapterManager) RegisterEngine ¶ added in v0.11.0
func (m *AdapterManager) RegisterEngine(eng *engine.Engine)
func (*AdapterManager) RegisterRoutes ¶ added in v0.11.0
func (m *AdapterManager) RegisterRoutes(router *mux.Router)
RegisterRoutes registers all adapter webhooks to a unified router Path format: /webhook/{platform} (e.g., /webhook/telegram, /webhook/discord)
func (*AdapterManager) SendMessage ¶
func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error
SendMessage sends a message to a specific platform
func (*AdapterManager) StopAll ¶
func (m *AdapterManager) StopAll() error
func (*AdapterManager) Unregister ¶
func (m *AdapterManager) Unregister(platform string) error
type AdapterMetrics ¶
type AdapterMetrics struct {
MessagesReceived atomic.Int64
MessagesSent atomic.Int64
MessagesFailed atomic.Int64
LastMessageAt atomic.Int64
Uptime atomic.Int64
}
func NewAdapterMetrics ¶
func NewAdapterMetrics() *AdapterMetrics
func (*AdapterMetrics) GetStats ¶
func (m *AdapterMetrics) GetStats() map[string]int64
func (*AdapterMetrics) RecordFailure ¶
func (m *AdapterMetrics) RecordFailure()
func (*AdapterMetrics) RecordReceive ¶
func (m *AdapterMetrics) RecordReceive()
func (*AdapterMetrics) RecordSend ¶
func (m *AdapterMetrics) RecordSend()
type Attachment ¶
type Attachment = base.Attachment
type ChatAdapter ¶
type ChatAdapter = base.ChatAdapter
type ChatMessage ¶
type ChatMessage = base.ChatMessage
func NewChatMessage ¶ added in v0.11.0
func NewChatMessage(platform, sessionID, userID, content string) *ChatMessage
type ChunkInfo ¶ added in v0.12.0
ChunkInfo holds metadata about chunked messages.
func GetChunkInfo ¶ added in v0.12.0
func GetChunkInfo(msg *base.ChatMessage) *ChunkInfo
GetChunkInfo returns the ChunkInfo from message metadata.
type ChunkProcessor ¶ added in v0.12.0
type ChunkProcessor struct {
// contains filtered or unexported fields
}
ChunkProcessor splits long messages into chunks that fit within platform limits. It uses the Slack chunker for Markdown-aware splitting.
func NewChunkProcessor ¶ added in v0.12.0
func NewChunkProcessor(logger *slog.Logger, opts ChunkProcessorOptions) *ChunkProcessor
NewChunkProcessor creates a new ChunkProcessor.
func (*ChunkProcessor) Name ¶ added in v0.12.0
func (p *ChunkProcessor) Name() string
Name returns the processor name.
func (*ChunkProcessor) Order ¶ added in v0.12.0
func (p *ChunkProcessor) Order() int
Order returns the processor order (runs after format conversion).
func (*ChunkProcessor) Process ¶ added in v0.12.0
func (p *ChunkProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process splits the message into chunks if it exceeds the character limit. Returns either a single message or a slice of messages.
type ChunkProcessorOptions ¶ added in v0.12.0
type ChunkProcessorOptions struct {
MaxChars int // Maximum characters per chunk (default: 4000 for Slack)
}
ChunkProcessorOptions configures the ChunkProcessor.
type CommandCallback ¶ added in v0.13.0
type CommandCallback struct {
// contains filtered or unexported fields
}
CommandCallback handles slash command progress events
func NewCommandCallback ¶ added in v0.13.0
func NewCommandCallback(ctx context.Context, platform, sessionID string, adapters *AdapterManager, logger *slog.Logger, metadata map[string]any) *CommandCallback
NewCommandCallback creates a new command callback
type ConfigLoader ¶
type ConfigLoader struct {
// contains filtered or unexported fields
}
func NewConfigLoader ¶
func NewConfigLoader(configDir string, logger *slog.Logger) (*ConfigLoader, error)
func (*ConfigLoader) Close ¶ added in v0.11.0
func (c *ConfigLoader) Close() error
Close stops all hot reload watchers and releases resources.
func (*ConfigLoader) GetConfig ¶
func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig
func (*ConfigLoader) GetOptions ¶ added in v0.11.0
func (c *ConfigLoader) GetOptions(platform string) map[string]any
func (*ConfigLoader) GetSystemPrompt ¶
func (c *ConfigLoader) GetSystemPrompt(platform string) string
func (*ConfigLoader) GetTaskInstructions ¶
func (c *ConfigLoader) GetTaskInstructions(platform string) string
func (*ConfigLoader) HasPlatform ¶
func (c *ConfigLoader) HasPlatform(platform string) bool
func (*ConfigLoader) Load ¶
func (c *ConfigLoader) Load(configDir string) error
func (*ConfigLoader) Platforms ¶
func (c *ConfigLoader) Platforms() []string
func (*ConfigLoader) StartHotReload ¶ added in v0.11.0
func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, onReload func(platform string, cfg *PlatformConfig)) error
StartHotReload starts watching all config files for changes and automatically reloads them. The onReload callback is called with the updated PlatformConfig for each platform.
type DingTalkConfig ¶
type DiscordEmbed ¶
type DiscordEmbed struct {
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
URL string `json:"url,omitempty"`
Color int `json:"color,omitempty"`
Fields []DiscordEmbedField `json:"fields,omitempty"`
Thumbnail *DiscordEmbedThumbnail `json:"thumbnail,omitempty"`
Image *DiscordEmbedImage `json:"image,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
}
type DiscordEmbedField ¶
type DiscordEmbedFooter ¶
type DiscordEmbedFooter struct {
}
type DiscordEmbedImage ¶
type DiscordEmbedImage struct {
URL string `json:"url"`
}
type DiscordEmbedThumbnail ¶
type DiscordEmbedThumbnail struct {
URL string `json:"url"`
}
type Engine ¶ added in v0.15.5
type Engine interface {
Execute(ctx context.Context, cfg *types.Config, prompt string, callback event.Callback) error
CheckDanger(prompt string) (blocked bool, operation, reason string)
GetSession(sessionID string) (Session, bool)
Close() error
GetSessionStats(sessionID string) *SessionStats
ValidateConfig(cfg *types.Config) error
StopSession(sessionID string, reason string) error
ResetSessionProvider(sessionID string)
SetDangerAllowPaths(paths []string)
SetDangerBypassEnabled(token string, enabled bool) error
SetAllowedTools(tools []string)
SetDisallowedTools(tools []string)
GetAllowedTools() []string
GetDisallowedTools() []string
}
Engine abstracts the engine functionality for dependency inversion
type EngineConfig ¶ added in v0.11.0
type EngineHolder ¶
type EngineHolder struct {
// contains filtered or unexported fields
}
EngineHolder holds the Engine instance and configuration for ChatApps integration
func NewEngineHolder ¶
func NewEngineHolder(opts EngineHolderOptions) (*EngineHolder, error)
NewEngineHolder creates a new EngineHolder with the given options
func (*EngineHolder) GetAdapterManager ¶
func (h *EngineHolder) GetAdapterManager() *AdapterManager
GetAdapterManager returns the AdapterManager for sending messages
func (*EngineHolder) GetEngine ¶
func (h *EngineHolder) GetEngine() Engine
GetEngine returns the underlying Engine instance
type EngineHolderOptions ¶
type EngineHolderOptions struct {
Logger *slog.Logger
Adapters *AdapterManager
Timeout time.Duration
IdleTimeout time.Duration
Namespace string
PermissionMode string
DangerouslySkipPermissions bool
AllowedTools []string
DisallowedTools []string
DefaultWorkDir string
DefaultTaskInstr string
}
EngineHolderOptions configures the EngineHolder
type EngineMessageHandler ¶
type EngineMessageHandler struct {
// contains filtered or unexported fields
}
EngineMessageHandler implements MessageHandler and integrates with Engine
func NewEngineMessageHandler ¶
func NewEngineMessageHandler(engine Engine, adapters *AdapterManager, opts ...EngineMessageHandlerOption) *EngineMessageHandler
NewEngineMessageHandler creates a new EngineMessageHandler
func (*EngineMessageHandler) Close ¶ added in v0.17.0
func (h *EngineMessageHandler) Close()
Close releases resources held by the handler
func (*EngineMessageHandler) Handle ¶
func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) error
Handle implements EngineMessageHandler
type EngineMessageHandlerOption ¶
type EngineMessageHandlerOption func(*EngineMessageHandler)
EngineMessageHandlerOption configures the EngineMessageHandler
func WithConfigLoader ¶
func WithConfigLoader(loader *ConfigLoader) EngineMessageHandlerOption
func WithLogger ¶
func WithLogger(logger *slog.Logger) EngineMessageHandlerOption
func WithTaskInstrFn ¶
func WithTaskInstrFn(fn func(sessionID string) string) EngineMessageHandlerOption
func WithWorkDirFn ¶
func WithWorkDirFn(fn func(sessionID string) string) EngineMessageHandlerOption
type FormatConversionProcessor ¶ added in v0.12.0
type FormatConversionProcessor struct {
// contains filtered or unexported fields
}
FormatConversionProcessor converts message content to platform-specific formats
func NewFormatConversionProcessor ¶ added in v0.12.0
func NewFormatConversionProcessor(logger *slog.Logger) *FormatConversionProcessor
NewFormatConversionProcessor creates a new FormatConversionProcessor
func (*FormatConversionProcessor) Name ¶ added in v0.12.0
func (p *FormatConversionProcessor) Name() string
Name returns the processor name
func (*FormatConversionProcessor) Order ¶ added in v0.12.0
func (p *FormatConversionProcessor) Order() int
Order returns the processor order
func (*FormatConversionProcessor) Process ¶ added in v0.12.0
func (p *FormatConversionProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process converts message content based on platform
type HealthCheck ¶
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
func NewHealthChecker ¶
func NewHealthChecker(interval time.Duration) *HealthChecker
func (*HealthChecker) GetStatus ¶
func (h *HealthChecker) GetStatus() map[string]HealthCheck
func (*HealthChecker) Register ¶
func (h *HealthChecker) Register(check HealthCheck)
func (*HealthChecker) Start ¶
func (h *HealthChecker) Start()
func (*HealthChecker) Stop ¶
func (h *HealthChecker) Stop()
type InlineKeyboardButton ¶
type InlineKeyboardMarkup ¶
type InlineKeyboardMarkup struct {
InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}
type InteractionCallback ¶ added in v0.12.0
type InteractionCallback func(interaction *PendingInteraction) error
InteractionCallback is a function that handles an interaction callback.
type InteractionManager ¶ added in v0.12.0
type InteractionManager struct {
// contains filtered or unexported fields
}
InteractionManager manages pending interactions for Slack interactive components.
func NewInteractionManager ¶ added in v0.12.0
func NewInteractionManager(logger *slog.Logger, opts InteractionManagerOptions) *InteractionManager
NewInteractionManager creates a new InteractionManager.
func (*InteractionManager) Complete ¶ added in v0.12.0
func (m *InteractionManager) Complete(id string, response *InteractionResponse) error
Complete marks an interaction as completed with a response.
func (*InteractionManager) Count ¶ added in v0.12.0
func (m *InteractionManager) Count() int
Count returns the number of pending interactions.
func (*InteractionManager) Delete ¶ added in v0.12.0
func (m *InteractionManager) Delete(id string)
Delete removes a pending interaction.
func (*InteractionManager) Expire ¶ added in v0.12.0
func (m *InteractionManager) Expire(id string) error
Expire marks an interaction as expired.
func (*InteractionManager) Get ¶ added in v0.12.0
func (m *InteractionManager) Get(id string) (*PendingInteraction, bool)
Get retrieves a pending interaction by ID. Returns nil if not found or expired.
func (*InteractionManager) GetBySession ¶ added in v0.12.0
func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction
GetBySession retrieves all pending interactions for a session.
func (*InteractionManager) HandleCallback ¶ added in v0.12.0
func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error
HandleCallback processes an interaction callback. It looks up the interaction, calls the callback, and removes the interaction.
func (*InteractionManager) PendingCount ¶ added in v0.12.0
func (m *InteractionManager) PendingCount() int
PendingCount returns the number of pending (non-expired) interactions.
func (*InteractionManager) Stop ¶ added in v0.12.0
func (m *InteractionManager) Stop()
Stop stops the cleanup goroutine.
func (*InteractionManager) Store ¶ added in v0.12.0
func (m *InteractionManager) Store(interaction *PendingInteraction) string
Store adds a new pending interaction and returns its ID.
func (*InteractionManager) TotalCount ¶ added in v0.12.0
func (m *InteractionManager) TotalCount() int
TotalCount returns the total number of interactions (including expired).
type InteractionManagerOptions ¶ added in v0.12.0
type InteractionManagerOptions struct {
CleanupInterval time.Duration // How often to run cleanup (default: 1 min)
TTL time.Duration // How long to keep pending interactions (default: 10 min)
}
InteractionManagerOptions configures the InteractionManager.
type InteractionResponse ¶ added in v0.12.0
type InteractionResponse struct {
ActionID string `json:"action_id"`
Value string `json:"value"`
UserID string `json:"user_id"`
RespondedAt time.Time `json:"responded_at"`
}
InteractionResponse represents the user's response to an interaction.
type InteractionStatus ¶ added in v0.12.0
type InteractionStatus string
InteractionStatus is the status of a pending interaction.
const ( InteractionStatusPending InteractionStatus = "pending" InteractionStatusCompleted InteractionStatus = "completed" InteractionStatusExpired InteractionStatus = "expired" InteractionStatusCancelled InteractionStatus = "cancelled" )
type InteractionType ¶ added in v0.12.0
type InteractionType string
InteractionType defines the type of interactive message.
const ( // InteractionTypePermission is for Claude Code permission requests (Issue #39). InteractionTypePermission InteractionType = "permission" // InteractionTypeApproval is for general approval requests (Issue #37). InteractionTypeApproval InteractionType = "approval" // InteractionTypeSelection is for selection/choice requests. InteractionTypeSelection InteractionType = "selection" )
type LifecycleManager ¶
type LifecycleManager struct {
// contains filtered or unexported fields
}
func NewLifecycleManager ¶
func NewLifecycleManager() *LifecycleManager
func (*LifecycleManager) OnStart ¶
func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)
func (*LifecycleManager) OnStop ¶
func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)
func (*LifecycleManager) RegisterAdapter ¶
func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)
func (*LifecycleManager) StopAll ¶
func (m *LifecycleManager) StopAll() error
type MessageFilterProcessor ¶ added in v0.15.1
type MessageFilterProcessor struct {
// contains filtered or unexported fields
}
MessageFilterProcessor drops noise events before they enter the rest of the chain.
func NewMessageFilterProcessor ¶ added in v0.15.1
func NewMessageFilterProcessor(logger *slog.Logger) *MessageFilterProcessor
NewMessageFilterProcessor creates a new MessageFilterProcessor.
func (*MessageFilterProcessor) Name ¶ added in v0.15.1
func (p *MessageFilterProcessor) Name() string
Name returns the processor name.
func (*MessageFilterProcessor) Order ¶ added in v0.15.1
func (p *MessageFilterProcessor) Order() int
Order returns the processor order – must run before all others.
func (*MessageFilterProcessor) Process ¶ added in v0.15.1
func (p *MessageFilterProcessor) Process(_ context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process drops hidden events by returning nil.
type MessageHandler ¶
type MessageHandler = base.MessageHandler
type MessageOperations ¶ added in v0.15.5
type MessageOperations = base.MessageOperations
Re-export interfaces from base for convenience
type MessageProcessor ¶ added in v0.12.0
type MessageProcessor interface {
// Process processes a message and returns the processed message
// Can return the same message pointer if no modification needed
// Can return a new message pointer if modification needed
// Can return an error to stop processing
Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
// Name returns the processor name for logging and debugging
Name() string
// Order returns the processor order in the chain (lower = earlier)
Order() int
}
MessageProcessor defines the interface for processing messages before sending
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
func NewMessageQueue ¶
func NewMessageQueue(logger *slog.Logger, maxSize, dlqSize, workers int) *MessageQueue
func (*MessageQueue) AddToDLQ ¶
func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)
AddToDLQ stores a failed message to the Dead Letter Queue
func (*MessageQueue) DLQLen ¶
func (q *MessageQueue) DLQLen() int
DLQLen returns the number of messages in the Dead Letter Queue
func (*MessageQueue) Dequeue ¶
func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)
func (*MessageQueue) Enqueue ¶
func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error
func (*MessageQueue) GetDLQ ¶
func (q *MessageQueue) GetDLQ() []*QueuedMessage
GetDLQ returns all messages in the Dead Letter Queue
func (*MessageQueue) Requeue ¶
func (q *MessageQueue) Requeue(msg *QueuedMessage) error
Requeue adds a message back to the queue for retry
func (*MessageQueue) Size ¶
func (q *MessageQueue) Size() int
func (*MessageQueue) Start ¶
func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), sendFn func(context.Context, string, string, *ChatMessage) error)
func (*MessageQueue) Stop ¶
func (q *MessageQueue) Stop()
type PendingInteraction ¶ added in v0.12.0
type PendingInteraction struct {
ID string
SessionID string
ChannelID string
MessageTS string
ActionID string
UserID string
CallbackData string
Callback InteractionCallback
CreatedAt time.Time
ExpiresAt time.Time
Type InteractionType
ThreadTS string
Status InteractionStatus
Response *InteractionResponse
Metadata map[string]any
}
PendingInteraction represents a pending interactive action (e.g., button click).
func CreatePendingInteraction ¶ added in v0.12.0
func CreatePendingInteraction( sessionID string, userID string, channelID string, interactionType InteractionType, metadata map[string]any, ttl time.Duration, ) *PendingInteraction
CreatePendingInteraction creates a new PendingInteraction with default values.
func (*PendingInteraction) IsExpired ¶ added in v0.12.0
func (p *PendingInteraction) IsExpired() bool
IsExpired returns true if the interaction has expired.
func (*PendingInteraction) TimeUntilExpiry ¶ added in v0.12.0
func (p *PendingInteraction) TimeUntilExpiry() time.Duration
TimeUntilExpiry returns the duration until the interaction expires.
type PermissionConfig ¶ added in v0.17.0
type PermissionConfig struct {
DMPolicy string `yaml:"dm_policy"`
GroupPolicy string `yaml:"group_policy"`
BotUserID string `yaml:"bot_user_id"`
AllowedUsers []string `yaml:"allowed_users"`
BlockedUsers []string `yaml:"blocked_users"`
SlashCommandRateLimit float64 `yaml:"slash_command_rate_limit"`
}
type PlatformConfig ¶
type PlatformConfig struct {
Platform string `yaml:"platform"`
SystemPrompt string `yaml:"system_prompt"`
TaskInstructions string `yaml:"task_instructions"`
Engine EngineConfig `yaml:"engine"`
Provider provider.ProviderConfig `yaml:"provider"`
Security SecurityConfig `yaml:"security"`
DingTalk DingTalkConfig `yaml:"dingtalk"`
WhatsApp WhatsAppConfig `yaml:"whatsapp"`
Options map[string]any `yaml:"options,omitempty"`
SourceFile string `yaml:"-"` // Tracks which file this config was loaded from
}
type ProcessorChain ¶ added in v0.12.0
type ProcessorChain struct {
// contains filtered or unexported fields
}
ProcessorChain executes a chain of message processors
func NewDefaultProcessorChain ¶ added in v0.12.0
func NewDefaultProcessorChain(ctx context.Context, logger *slog.Logger) *ProcessorChain
NewDefaultProcessorChain creates a default processor chain with all standard processors
func NewProcessorChain ¶ added in v0.12.0
func NewProcessorChain(processors ...MessageProcessor) *ProcessorChain
NewProcessorChain creates a new processor chain with the given processors Processors are automatically sorted by Order()
func (*ProcessorChain) AddProcessor ¶ added in v0.12.0
func (c *ProcessorChain) AddProcessor(processor MessageProcessor)
AddProcessor adds a processor to the chain and re-sorts Note: This method is thread-safe but should preferably be called during initialization
func (*ProcessorChain) Close ¶ added in v0.17.0
func (c *ProcessorChain) Close()
Close stops all processors that have background goroutines
func (*ProcessorChain) Name ¶ added in v0.16.0
func (c *ProcessorChain) Name() string
Name returns the processor chain name
func (*ProcessorChain) Order ¶ added in v0.16.0
func (c *ProcessorChain) Order() int
Order returns the processor chain order
func (*ProcessorChain) Process ¶ added in v0.12.0
func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process executes all processors in order
func (*ProcessorChain) ResetSession ¶ added in v0.15.1
func (c *ProcessorChain) ResetSession(platform, sessionID string)
ResetSession propagate session reset to all processors that support it
type ProcessorOrder ¶ added in v0.12.0
type ProcessorOrder int
ProcessorOrder defines standard processor ordering
const ( // OrderFilter drops noise events before anything else OrderFilter ProcessorOrder = 10 // OrderThread manages thread_ts caching for message chunking OrderThread ProcessorOrder = 15 // OrderFormatConversion converts markdown to platform-specific format OrderFormatConversion ProcessorOrder = 40 // OrderChunk splits long messages for Slack API limits OrderChunk ProcessorOrder = 50 )
type QueueError ¶
type QueueError struct {
Message string
}
func (*QueueError) Error ¶
func (e *QueueError) Error() string
type QueuedMessage ¶
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
func NewRateLimiter ¶
func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
type RetryConfig ¶
type RichContent ¶
type RichContent = base.RichContent
type SecurityConfig ¶ added in v0.17.0
type SecurityConfig struct {
VerifySignature bool `yaml:"verify_signature"`
Permission PermissionConfig `yaml:"permission"`
}
type SessionOperations ¶ added in v0.15.5
type SessionOperations = base.SessionOperations
Re-export interfaces from base for convenience
type SessionStats ¶ added in v0.15.5
type SessionStats struct {
SessionID string
Status string
TotalTokens int64
InputTokens int64
OutputTokens int64
CacheRead int64
CacheWrite int64
TotalCost float64
Duration time.Duration
ToolCallCount int
ErrorCount int
}
SessionStats holds session statistics
type SlackBlock ¶
type StreamAdapter ¶
type StreamAdapter interface {
ChatAdapter
SendStreamMessage(ctx context.Context, sessionID string, msg *ChatMessage) (StreamHandler, error)
UpdateMessage(ctx context.Context, sessionID, messageID string, msg *ChatMessage) error
}
type StreamCallback ¶
type StreamCallback struct {
// contains filtered or unexported fields
}
StreamCallback implements event.Callback to receive Engine events and forward to ChatApp
func NewStreamCallback ¶
func NewStreamCallback( ctx context.Context, sessionID, platform string, adapters *AdapterManager, logger *slog.Logger, isHot bool, metadata map[string]any, messageOps MessageOperations, sessionOps SessionOperations, ) *StreamCallback
NewStreamCallback creates a new StreamCallback with injected platform-specific operations
func (*StreamCallback) Close ¶ added in v0.17.0
func (c *StreamCallback) Close()
Close releases resources held by the callback
type StreamHandler ¶
type ThreadInfo ¶ added in v0.12.0
ThreadInfo holds thread-related metadata for a session.
type ThreadProcessor ¶ added in v0.12.0
type ThreadProcessor struct {
// contains filtered or unexported fields
}
ThreadProcessor manages thread_ts caching for message chunking. It tracks the first message's timestamp for each session to associate subsequent chunked messages in the same thread.
func NewThreadProcessor ¶ added in v0.12.0
func NewThreadProcessor(logger *slog.Logger, opts ThreadProcessorOptions) *ThreadProcessor
NewThreadProcessor creates a new ThreadProcessor.
func (*ThreadProcessor) Delete ¶ added in v0.12.0
func (p *ThreadProcessor) Delete(sessionID string)
Delete removes thread info for a session.
func (*ThreadProcessor) GetThreadTS ¶ added in v0.12.0
func (p *ThreadProcessor) GetThreadTS(sessionID string) string
GetThreadTS returns the stored thread_ts for a session. Returns empty string if no thread info exists.
func (*ThreadProcessor) Name ¶ added in v0.12.0
func (p *ThreadProcessor) Name() string
Name returns the processor name.
func (*ThreadProcessor) Order ¶ added in v0.12.0
func (p *ThreadProcessor) Order() int
Order returns the processor order (runs after rate limit, before aggregation).
func (*ThreadProcessor) Process ¶ added in v0.12.0
func (p *ThreadProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process manages thread_ts for the message. For the first message: stores thread_ts from metadata if present. For subsequent messages: attaches stored thread_ts to metadata.
func (*ThreadProcessor) SetThreadTS ¶ added in v0.12.0
func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)
SetThreadTS explicitly sets the thread_ts for a session. This is useful when the first message response provides the thread_ts.
func (*ThreadProcessor) Stop ¶ added in v0.12.0
func (p *ThreadProcessor) Stop()
Stop stops the cleanup goroutine.