chatapps

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 32 Imported by: 0

README

ChatApps: Multi-Platform Connector

The chatapps package provides the bridge between HotPlex's core engine and various chat platforms (Slack, Feishu/Lark, Telegram, Discord, etc.). It normalizes platform-specific events and messages into a unified format for the engine to process.

🏛 Architecture Overview

HotPlex uses an Adapter-based Pipeline architecture to achieve platform neutrality and consistent behavior across different IM apps.

🔄 End-to-End Bidirectional Flow

From the client terminal's perspective, HotPlex operates as a reactive streaming system.

sequenceDiagram
    participant User as 👤 User Terminal
    participant Platform as ☁️ Chat Platform (Slack/Feishu)
    participant Adapter as 🔌 Platform Adapter
    participant Pipeline as ⛓️ Processor Pipeline
    participant Handler as 🧠 Engine Handler
    participant Engine as 🔥 HotPlex Engine

    Note over User, Engine: 🟢 Phase 1: Request (C2S)
    User->>Platform: Sends message/command
    Platform->>Adapter: Webhook Event (JSON)
    Adapter->>Adapter: Parse & Normalize to base.ChatMessage
    Adapter->>Handler: Handle(ctx, msg)
    Handler->>Engine: Execute(ctx, prompt)

    Note over User, Engine: 🔵 Phase 2: Response Streaming (S2C)
    Engine-->>Handler: Stream Token/Event
    Handler->>Pipeline: Process(msg)
    Pipeline->>Pipeline: Filter -> RateLimit -> Format -> Chunk
    Pipeline->>Adapter: SendMessage/UpdateMessage
    Adapter->>Platform: Platform API Call (e.g., chat.update)
    Platform-->>User: Visual Update (Real-time)
    
    Note over User, Engine: ⚪ Phase 3: Finalization
    Engine-->>Handler: Execution Complete
    Handler->>Adapter: Send Session Stats/UI Controls
    Adapter->>Platform: Final Message Render
    Platform-->>User: Session Result
Data Normalization (Event to Message Mapping)

The chatapps layer normalizes raw provider events into a standard "Chat Language" using the base.MessageType Go type. While the underlying values often share names with Engine events, they represent UI Rendering Intents documented in base/types.go.

Provider/Engine Event base.MessageType Constant UI Presentation
thinking MessageTypeThinking [Status Only] Thinking indicators / Bubbles
tool_use MessageTypeToolUse [Status Only] "Executing tool..." indicator
tool_result MessageTypeToolResult [Silent Success] Status only; Errors show blocks
answer MessageTypeAnswer Standard Markdown text / Streaming output
error MessageTypeError Red-themed alert block
plan_mode MessageTypePlanMode Planning phase indicator (Status + Text)
permission_request MessageTypePermissionRequest Interactive Allow/Deny buttons
session_stats MessageTypeSessionStats Usage summary (Tokens, Duration)
danger_block MessageTypeDangerBlock Critical warning with confirmation

[!NOTE] Slack Free Plan Compatibility: Some advanced features (Streaming, Status Bar) require a paid plan or Developer Sandbox. See docs/plans/slack_free_plan_compatibility.md for current tracking of fallback optimizations.

Key Architectural Concepts
  • ProcessorChain: A middleware-style pipeline that processes messages before they are sent or after they are received. Standard processors include:
    • Filter: [Black Hole] Silently drops noise events, unparsed raw outputs, and redundant user reflections within the integration layer.
    • Thread: Manages thread state and caching for multi-step responses to maintain context.
    • FormatConversion: Converts Standard Markdown to platform-specific formats (e.g., Slack Block Kit, Feishu Card).
    • Chunking: Splits long messages to respect platform API limits.
  • Space Folding: A policy where high-volume tool outputs (>2KB) are automatically diverted to thread replies or collapsed, preventing main channel pollution while preserving "Geek Transparency".
  • EngineMessageHandler: The main business logic that connects normalized chat events to the HotPlex Engine.

🛠 Developer Guide

1. Implementing a New Platform Adapter

To add a platform (e.g., whatsapp), create a new package chatapps/whatsapp and follow these patterns:

Phase A: Core Interface (base.ChatAdapter)

The foundation of any adapter is the ChatAdapter interface.

type WhatsAppAdapter struct {
    config  Config
    handler base.MessageHandler // Injected via SetHandler
    logger  *slog.Logger
}

func (a *WhatsAppAdapter) Platform() string { return "whatsapp" }

// Handle incoming platform events (e.g., from webhook)
func (a *WhatsAppAdapter) HandleMessage(ctx context.Context, msg *base.ChatMessage) error {
    if a.handler != nil {
        return a.handler(ctx, msg) // Delegate to Engine
    }
    return nil
}

// Send outgoing formatted messages
func (a *WhatsAppAdapter) SendMessage(ctx context.Context, sessionID string, msg *base.ChatMessage) error {
    // 1. Convert msg (Markdown/Blocks) to WhatsApp format
    // 2. Call WhatsApp Cloud API
}

func (a *WhatsAppAdapter) SetHandler(h base.MessageHandler) { a.handler = h }
Phase B: Webhook Support (base.WebhookProvider)

If the platform uses webhooks, implement this to register routes automatically under /webhook/myplatform/.

func (a *WhatsAppAdapter) WebhookHandler() http.Handler {
    mux := http.NewServeMux()
    mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
        // 1. Parse signature & event
        // 2. Convert to base.ChatMessage
        // 3. Call a.HandleMessage(r.Context(), normalizedMsg)
    })
    return mux
}
Phase C: Advanced UI & Resiliency

Implement these optional interfaces to provide a premium experience:

  • base.StatusProvider: Handles "Thinking..." or "Running tool X..." visual indicators.
  • base.MessageOperations: Supports updating/deleting existing messages (critical for streaming and UI updates).
  • base.StreamWriter: A standard io.Writer interface for real-time token streaming.
2. Message Processor Pipeline

The ProcessorChain is a middleware system. You can add global or platform-specific processors.

// Example: Custom Privacy Masking Processor
type PrivacyMasker struct {}
func (p *PrivacyMasker) Name() string { return "privacy_mask" }
func (p *PrivacyMasker) Order() int { return 12 } // Run between RateLimit and Aggregation

func (p *PrivacyMasker) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error) {
    msg.Content = strings.ReplaceAll(msg.Content, "SECRET", "****")
    return msg, nil
}
3. Integration into setup.go

Once implemented, register your adapter in chatapps/setup.go:

// 1. Add to setupPlatform calls
setupPlatform(ctx, "whatsapp", loader, manager, logger, func(pc *PlatformConfig) ChatAdapter {
    // return whatsapp.NewAdapter(...)
})

// 2. Ensure credentials are in .env or config.yaml

🏗 Interaction & Button Handling

For platforms supporting buttons (Slack Blocks, Feishu Cards), use the InteractionManager:

  1. Define Action IDs: Use a structured format {action_name}:{session_id} in buttons.
  2. Handle Callbacks: The adapter should capture button clicks and route them to InteractionManager.HandleAction.
  3. UI Feedback: Use StatusProvider to show "Processing..." while the action executes.

📊 Observability & Metrics

HOTPLEX_CHATAPPS_CONFIG_DIR: Path to platform-specific YAML configs.


Status: Active / Modular
Maintainer: HotPlex Core Team

Documentation

Index

Constants

View Source
const (
	StatusThinkingLabel       = "🧠 深度推演规划中..."
	StatusToolExecutingLabel  = "🛠️ 正在执行工具 [%s]..."
	StatusResultParsingLabel  = "🧠 正在解析执行结果..."
	StatusErrorLabel          = "❌ 执行过程中发生错误"
	StatusDangerBlockLabel    = "⚠️ 拦截到高危操作,等待确认..."
	StatusProgressLabel       = "⏳ 正在执行后台任务..."
	StatusProgressDoneLabel   = "✅ 后台任务执行完毕"
	StatusStepStartLabel      = "🔍 正在分析执行轨迹..."
	StatusStepFinishLabel     = "✅ 当前任务阶段构建完成"
	StatusPlanModeLabel       = "📝 正在制定作战计划..."
	StatusExitPlanModeLabel   = "📝 作战计划就绪,等待您的批准..."
	StatusAskUserLabel        = "⏳ 等待您提供更多信息..."
	StatusEngineStartingLabel = "🚀 正在唤醒推演引擎..."
	StatusPermissionLabel     = "🛡️ 拦截到高危操作,等待提权审批..."

	// Contextual labels
	StatusToolResultThinkingLabel = "🧠 正在解析执行结果 (耗时: %dms)..."
	StatusSessionStartColdLabel   = "🚀 正在初始化上下文..."
	StatusSessionStartHotLabel    = "🚀 重新连接并恢复上下文..."
	StatusAnswerLabel             = "✍️ 正在生成回答..."
)

UI Status indicator labels for AI Native experience

View Source
const (
	ParseModeNone     = base.ParseModeNone
	ParseModeMarkdown = base.ParseModeMarkdown
	ParseModeHTML     = base.ParseModeHTML
)
View Source
const (
	// Metadata keys
	MetadataKeyTransient = "is_transient"
)

Variables

View Source
var (
	// MessagesAggregatedTotal counts total messages aggregated
	MessagesAggregatedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_aggregated_total",
			Help: "Total number of messages that have been aggregated",
		},
		[]string{"event_type", "platform"},
	)

	// MessagesFlushedTotal counts total flushes by reason
	MessagesFlushedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_flushed_total",
			Help: "Total number of times the buffer was flushed",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// MessagesDroppedTotal counts dropped messages
	MessagesDroppedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_dropped_total",
			Help: "Total number of messages dropped due to buffer overflow",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// BufferSizeGauge tracks current buffer size per platform
	BufferSizeGauge = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "hotplex_aggregator_buffer_size",
			Help: "Current number of messages in the buffer per platform",
		},
		[]string{"platform"},
	)

	// BufferDurationHistogram tracks time from first message to flush
	BufferDurationHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_buffer_duration_seconds",
			Help:    "Time in seconds from first message arrival to buffer flush",
			Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0},
		},
		[]string{"platform"},
	)

	// MessageSizeHistogram tracks message size distribution
	MessageSizeHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_message_size_bytes",
			Help:    "Size distribution of aggregated messages in bytes",
			Buckets: []float64{64, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768},
		},
		[]string{"event_type", "platform"},
	)
)
View Source
var ErrQueueFull = &QueueError{Message: "queue is full"}

Functions

func GenerateInteractionID added in v0.12.0

func GenerateInteractionID(sessionID string, timestamp time.Time) string

GenerateInteractionID generates a unique interaction ID.

func GetExtraChunks added in v0.12.0

func GetExtraChunks(msg *base.ChatMessage) []string

GetExtraChunks returns any extra chunks stored in message metadata. Returns nil if no extra chunks exist.

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError classifies errors as retryable or non-retryable

func RetryWithBackoff

func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error

Types

type AdapterManager

type AdapterManager struct {
	// contains filtered or unexported fields
}

func NewAdapterManager

func NewAdapterManager(logger *slog.Logger) *AdapterManager

func Setup added in v0.11.0

func Setup(ctx context.Context, logger *slog.Logger, configDir ...string) (http.Handler, *AdapterManager, error)

Setup initializes all enabled ChatApps and their dedicated Engines. It returns an http.Handler that handles all webhook routes. The configDir parameter takes priority over HOTPLEX_CHATAPPS_CONFIG_DIR environment variable.

func (*AdapterManager) GetAdapter

func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)

func (*AdapterManager) GetMessageOperations added in v0.15.5

func (m *AdapterManager) GetMessageOperations(platform string) MessageOperations

GetMessageOperations returns platform-specific message operations interface Returns nil if the platform doesn't support message operations

func (*AdapterManager) GetSessionOperations added in v0.15.5

func (m *AdapterManager) GetSessionOperations(platform string) SessionOperations

GetSessionOperations returns platform-specific session operations interface Returns nil if the platform doesn't support session operations

func (*AdapterManager) GetStatusProvider added in v0.18.0

func (m *AdapterManager) GetStatusProvider(platform string) base.StatusProvider

GetStatusProvider returns platform-specific status provider interface Returns nil if the platform doesn't support status operations

func (*AdapterManager) Handler added in v0.11.0

func (m *AdapterManager) Handler() http.Handler

Handler returns an http.Handler with all adapter webhooks mounted This is a convenience method when you don't need gorilla/mux

func (*AdapterManager) ListPlatforms

func (m *AdapterManager) ListPlatforms() []string

func (*AdapterManager) NewStreamWriter added in v0.18.0

func (m *AdapterManager) NewStreamWriter(ctx context.Context, platform, channelID, threadTS string) base.StreamWriter

NewStreamWriter creates a platform-agnostic streaming writer for the given platform Returns nil if the platform doesn't support streaming or adapter not found

func (*AdapterManager) Register

func (m *AdapterManager) Register(adapter ChatAdapter) error

func (*AdapterManager) RegisterEngine added in v0.11.0

func (m *AdapterManager) RegisterEngine(eng *engine.Engine)

func (*AdapterManager) RegisterRoutes added in v0.11.0

func (m *AdapterManager) RegisterRoutes(router *mux.Router)

RegisterRoutes registers all adapter webhooks to a unified router Path format: /webhook/{platform} (e.g., /webhook/telegram, /webhook/discord)

func (*AdapterManager) SendMessage

func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error

SendMessage sends a message to a specific platform

func (*AdapterManager) StartAll

func (m *AdapterManager) StartAll(ctx context.Context) error

func (*AdapterManager) StopAll

func (m *AdapterManager) StopAll() error

func (*AdapterManager) Unregister

func (m *AdapterManager) Unregister(platform string) error

type AdapterMetrics

type AdapterMetrics struct {
	MessagesReceived atomic.Int64
	MessagesSent     atomic.Int64
	MessagesFailed   atomic.Int64
	LastMessageAt    atomic.Int64
	Uptime           atomic.Int64
}

func NewAdapterMetrics

func NewAdapterMetrics() *AdapterMetrics

func (*AdapterMetrics) GetStats

func (m *AdapterMetrics) GetStats() map[string]int64

func (*AdapterMetrics) RecordFailure

func (m *AdapterMetrics) RecordFailure()

func (*AdapterMetrics) RecordReceive

func (m *AdapterMetrics) RecordReceive()

func (*AdapterMetrics) RecordSend

func (m *AdapterMetrics) RecordSend()

type Attachment

type Attachment = base.Attachment

type ChatAdapter

type ChatAdapter = base.ChatAdapter

type ChatMessage

type ChatMessage = base.ChatMessage

func NewChatMessage added in v0.11.0

func NewChatMessage(platform, sessionID, userID, content string) *ChatMessage

type ChunkInfo added in v0.12.0

type ChunkInfo struct {
	TotalChunks  int
	CurrentChunk int
	ThreadTS     string
	ChannelID    string
}

ChunkInfo holds metadata about chunked messages.

func GetChunkInfo added in v0.12.0

func GetChunkInfo(msg *base.ChatMessage) *ChunkInfo

GetChunkInfo returns the ChunkInfo from message metadata.

type ChunkProcessor added in v0.12.0

type ChunkProcessor struct {
	// contains filtered or unexported fields
}

ChunkProcessor splits long messages into chunks that fit within platform limits. It uses the Slack chunker for Markdown-aware splitting.

func NewChunkProcessor added in v0.12.0

func NewChunkProcessor(logger *slog.Logger, opts ChunkProcessorOptions) *ChunkProcessor

NewChunkProcessor creates a new ChunkProcessor.

func (*ChunkProcessor) Name added in v0.12.0

func (p *ChunkProcessor) Name() string

Name returns the processor name.

func (*ChunkProcessor) Order added in v0.12.0

func (p *ChunkProcessor) Order() int

Order returns the processor order (runs after format conversion).

func (*ChunkProcessor) Process added in v0.12.0

func (p *ChunkProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process splits the message into chunks if it exceeds the character limit. Returns either a single message or a slice of messages.

type ChunkProcessorOptions added in v0.12.0

type ChunkProcessorOptions struct {
	MaxChars int // Maximum characters per chunk (default: 4000 for Slack)
}

ChunkProcessorOptions configures the ChunkProcessor.

type CommandCallback added in v0.13.0

type CommandCallback struct {
	// contains filtered or unexported fields
}

CommandCallback handles slash command progress events

func NewCommandCallback added in v0.13.0

func NewCommandCallback(ctx context.Context, platform, sessionID string, adapters *AdapterManager, logger *slog.Logger, metadata map[string]any) *CommandCallback

NewCommandCallback creates a new command callback

func (*CommandCallback) Handle added in v0.13.0

func (c *CommandCallback) Handle(eventType string, data any) error

Handle implements event.Callback interface

type ConfigLoader

type ConfigLoader struct {
	// contains filtered or unexported fields
}

func NewConfigLoader

func NewConfigLoader(configDir string, logger *slog.Logger) (*ConfigLoader, error)

func (*ConfigLoader) Close added in v0.11.0

func (c *ConfigLoader) Close() error

Close stops all hot reload watchers and releases resources.

func (*ConfigLoader) GetConfig

func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig

func (*ConfigLoader) GetOptions added in v0.11.0

func (c *ConfigLoader) GetOptions(platform string) map[string]any

func (*ConfigLoader) GetSystemPrompt

func (c *ConfigLoader) GetSystemPrompt(platform string) string

func (*ConfigLoader) GetTaskInstructions

func (c *ConfigLoader) GetTaskInstructions(platform string) string

func (*ConfigLoader) HasPlatform

func (c *ConfigLoader) HasPlatform(platform string) bool

func (*ConfigLoader) Load

func (c *ConfigLoader) Load(configDir string) error

func (*ConfigLoader) Platforms

func (c *ConfigLoader) Platforms() []string

func (*ConfigLoader) StartHotReload added in v0.11.0

func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, onReload func(platform string, cfg *PlatformConfig)) error

StartHotReload starts watching all config files for changes and automatically reloads them. The onReload callback is called with the updated PlatformConfig for each platform.

type DingTalkConfig

type DingTalkConfig struct {
	AppID         string `yaml:"app_id"`
	AppSecret     string `yaml:"app_secret"`
	CallbackToken string `yaml:"callback_token"`
	CallbackKey   string `yaml:"callback_key"`
	MaxMessageLen int    `yaml:"max_message_len"`
}

type DiscordEmbed

type DiscordEmbed struct {
	Title       string                 `json:"title,omitempty"`
	Description string                 `json:"description,omitempty"`
	URL         string                 `json:"url,omitempty"`
	Color       int                    `json:"color,omitempty"`
	Fields      []DiscordEmbedField    `json:"fields,omitempty"`
	Footer      *DiscordEmbedFooter    `json:"footer,omitempty"`
	Thumbnail   *DiscordEmbedThumbnail `json:"thumbnail,omitempty"`
	Image       *DiscordEmbedImage     `json:"image,omitempty"`
	Timestamp   string                 `json:"timestamp,omitempty"`
}

type DiscordEmbedField

type DiscordEmbedField struct {
	Name   string `json:"name"`
	Value  string `json:"value"`
	Inline bool   `json:"inline,omitempty"`
}

type DiscordEmbedFooter

type DiscordEmbedFooter struct {
	Text    string `json:"text"`
	IconURL string `json:"icon_url,omitempty"`
}

type DiscordEmbedImage

type DiscordEmbedImage struct {
	URL string `json:"url"`
}

type DiscordEmbedThumbnail

type DiscordEmbedThumbnail struct {
	URL string `json:"url"`
}

type Engine added in v0.15.5

type Engine interface {
	Execute(ctx context.Context, cfg *types.Config, prompt string, callback event.Callback) error
	CheckDanger(prompt string) (blocked bool, operation, reason string)
	GetSession(sessionID string) (Session, bool)
	Close() error
	GetSessionStats(sessionID string) *SessionStats
	ValidateConfig(cfg *types.Config) error
	StopSession(sessionID string, reason string) error
	ResetSessionProvider(sessionID string)
	SetDangerAllowPaths(paths []string)
	SetDangerBypassEnabled(token string, enabled bool) error
	SetAllowedTools(tools []string)
	SetDisallowedTools(tools []string)
	GetAllowedTools() []string
	GetDisallowedTools() []string
}

Engine abstracts the engine functionality for dependency inversion

type EngineConfig added in v0.11.0

type EngineConfig struct {
	Timeout         time.Duration `yaml:"timeout"`
	IdleTimeout     time.Duration `yaml:"idle_timeout"`
	WorkDir         string        `yaml:"work_dir"`
	AllowedTools    []string      `yaml:"allowed_tools"`
	DisallowedTools []string      `yaml:"disallowed_tools"`
}

type EngineHolder

type EngineHolder struct {
	// contains filtered or unexported fields
}

EngineHolder holds the Engine instance and configuration for ChatApps integration

func NewEngineHolder

func NewEngineHolder(opts EngineHolderOptions) (*EngineHolder, error)

NewEngineHolder creates a new EngineHolder with the given options

func (*EngineHolder) GetAdapterManager

func (h *EngineHolder) GetAdapterManager() *AdapterManager

GetAdapterManager returns the AdapterManager for sending messages

func (*EngineHolder) GetEngine

func (h *EngineHolder) GetEngine() Engine

GetEngine returns the underlying Engine instance

type EngineHolderOptions

type EngineHolderOptions struct {
	Logger                     *slog.Logger
	Adapters                   *AdapterManager
	Timeout                    time.Duration
	IdleTimeout                time.Duration
	Namespace                  string
	PermissionMode             string
	DangerouslySkipPermissions bool
	AllowedTools               []string
	DisallowedTools            []string
	DefaultWorkDir             string
	DefaultTaskInstr           string
}

EngineHolderOptions configures the EngineHolder

type EngineMessageHandler

type EngineMessageHandler struct {
	// contains filtered or unexported fields
}

EngineMessageHandler implements MessageHandler and integrates with Engine

func NewEngineMessageHandler

func NewEngineMessageHandler(engine Engine, adapters *AdapterManager, opts ...EngineMessageHandlerOption) *EngineMessageHandler

NewEngineMessageHandler creates a new EngineMessageHandler

func (*EngineMessageHandler) Close added in v0.17.0

func (h *EngineMessageHandler) Close()

Close releases resources held by the handler

func (*EngineMessageHandler) Handle

func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) error

Handle implements EngineMessageHandler

type EngineMessageHandlerOption

type EngineMessageHandlerOption func(*EngineMessageHandler)

EngineMessageHandlerOption configures the EngineMessageHandler

func WithConfigLoader

func WithConfigLoader(loader *ConfigLoader) EngineMessageHandlerOption

func WithLogger

func WithLogger(logger *slog.Logger) EngineMessageHandlerOption

func WithTaskInstrFn

func WithTaskInstrFn(fn func(sessionID string) string) EngineMessageHandlerOption

func WithWorkDirFn

func WithWorkDirFn(fn func(sessionID string) string) EngineMessageHandlerOption

type FormatConversionProcessor added in v0.12.0

type FormatConversionProcessor struct {
	// contains filtered or unexported fields
}

FormatConversionProcessor converts message content to platform-specific formats

func NewFormatConversionProcessor added in v0.12.0

func NewFormatConversionProcessor(logger *slog.Logger) *FormatConversionProcessor

NewFormatConversionProcessor creates a new FormatConversionProcessor

func (*FormatConversionProcessor) Name added in v0.12.0

Name returns the processor name

func (*FormatConversionProcessor) Order added in v0.12.0

func (p *FormatConversionProcessor) Order() int

Order returns the processor order

func (*FormatConversionProcessor) Process added in v0.12.0

Process converts message content based on platform

type HealthCheck

type HealthCheck struct {
	Name      string
	Status    string
	LastCheck time.Time
	LastError string
	Interval  time.Duration
	CheckFunc func() error
}

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

func NewHealthChecker

func NewHealthChecker(interval time.Duration) *HealthChecker

func (*HealthChecker) GetStatus

func (h *HealthChecker) GetStatus() map[string]HealthCheck

func (*HealthChecker) Register

func (h *HealthChecker) Register(check HealthCheck)

func (*HealthChecker) Start

func (h *HealthChecker) Start()

func (*HealthChecker) Stop

func (h *HealthChecker) Stop()

type InlineKeyboardButton

type InlineKeyboardButton struct {
	Text         string `json:"text"`
	URL          string `json:"url,omitempty"`
	CallbackData string `json:"callback_data,omitempty"`
}

type InlineKeyboardMarkup

type InlineKeyboardMarkup struct {
	InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}

type InteractionCallback added in v0.12.0

type InteractionCallback func(interaction *PendingInteraction) error

InteractionCallback is a function that handles an interaction callback.

type InteractionManager added in v0.12.0

type InteractionManager struct {
	// contains filtered or unexported fields
}

InteractionManager manages pending interactions for Slack interactive components.

func NewInteractionManager added in v0.12.0

func NewInteractionManager(logger *slog.Logger, opts InteractionManagerOptions) *InteractionManager

NewInteractionManager creates a new InteractionManager.

func (*InteractionManager) Complete added in v0.12.0

func (m *InteractionManager) Complete(id string, response *InteractionResponse) error

Complete marks an interaction as completed with a response.

func (*InteractionManager) Count added in v0.12.0

func (m *InteractionManager) Count() int

Count returns the number of pending interactions.

func (*InteractionManager) Delete added in v0.12.0

func (m *InteractionManager) Delete(id string)

Delete removes a pending interaction.

func (*InteractionManager) Expire added in v0.12.0

func (m *InteractionManager) Expire(id string) error

Expire marks an interaction as expired.

func (*InteractionManager) Get added in v0.12.0

Get retrieves a pending interaction by ID. Returns nil if not found or expired.

func (*InteractionManager) GetBySession added in v0.12.0

func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction

GetBySession retrieves all pending interactions for a session.

func (*InteractionManager) HandleCallback added in v0.12.0

func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error

HandleCallback processes an interaction callback. It looks up the interaction, calls the callback, and removes the interaction.

func (*InteractionManager) PendingCount added in v0.12.0

func (m *InteractionManager) PendingCount() int

PendingCount returns the number of pending (non-expired) interactions.

func (*InteractionManager) Stop added in v0.12.0

func (m *InteractionManager) Stop()

Stop stops the cleanup goroutine.

func (*InteractionManager) Store added in v0.12.0

func (m *InteractionManager) Store(interaction *PendingInteraction) string

Store adds a new pending interaction and returns its ID.

func (*InteractionManager) TotalCount added in v0.12.0

func (m *InteractionManager) TotalCount() int

TotalCount returns the total number of interactions (including expired).

type InteractionManagerOptions added in v0.12.0

type InteractionManagerOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 1 min)
	TTL             time.Duration // How long to keep pending interactions (default: 10 min)
}

InteractionManagerOptions configures the InteractionManager.

type InteractionResponse added in v0.12.0

type InteractionResponse struct {
	ActionID    string    `json:"action_id"`
	Value       string    `json:"value"`
	UserID      string    `json:"user_id"`
	RespondedAt time.Time `json:"responded_at"`
}

InteractionResponse represents the user's response to an interaction.

type InteractionStatus added in v0.12.0

type InteractionStatus string

InteractionStatus is the status of a pending interaction.

const (
	InteractionStatusPending   InteractionStatus = "pending"
	InteractionStatusCompleted InteractionStatus = "completed"
	InteractionStatusExpired   InteractionStatus = "expired"
	InteractionStatusCancelled InteractionStatus = "cancelled"
)

type InteractionType added in v0.12.0

type InteractionType string

InteractionType defines the type of interactive message.

const (
	// InteractionTypePermission is for Claude Code permission requests (Issue #39).
	InteractionTypePermission InteractionType = "permission"
	// InteractionTypeApproval is for general approval requests (Issue #37).
	InteractionTypeApproval InteractionType = "approval"
	// InteractionTypeSelection is for selection/choice requests.
	InteractionTypeSelection InteractionType = "selection"
)

type LifecycleManager

type LifecycleManager struct {
	// contains filtered or unexported fields
}

func NewLifecycleManager

func NewLifecycleManager() *LifecycleManager

func (*LifecycleManager) OnStart

func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)

func (*LifecycleManager) OnStop

func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)

func (*LifecycleManager) RegisterAdapter

func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)

func (*LifecycleManager) StartAll

func (m *LifecycleManager) StartAll(ctx context.Context) error

func (*LifecycleManager) StopAll

func (m *LifecycleManager) StopAll() error

type Logger

type Logger = slog.Logger

type MessageFilterProcessor added in v0.15.1

type MessageFilterProcessor struct {
	// contains filtered or unexported fields
}

MessageFilterProcessor drops noise events before they enter the rest of the chain.

func NewMessageFilterProcessor added in v0.15.1

func NewMessageFilterProcessor(logger *slog.Logger) *MessageFilterProcessor

NewMessageFilterProcessor creates a new MessageFilterProcessor.

func (*MessageFilterProcessor) Name added in v0.15.1

func (p *MessageFilterProcessor) Name() string

Name returns the processor name.

func (*MessageFilterProcessor) Order added in v0.15.1

func (p *MessageFilterProcessor) Order() int

Order returns the processor order – must run before all others.

func (*MessageFilterProcessor) Process added in v0.15.1

Process drops hidden events by returning nil.

type MessageHandler

type MessageHandler = base.MessageHandler

type MessageOperations added in v0.15.5

type MessageOperations = base.MessageOperations

Re-export interfaces from base for convenience

type MessageProcessor added in v0.12.0

type MessageProcessor interface {
	// Process processes a message and returns the processed message
	// Can return the same message pointer if no modification needed
	// Can return a new message pointer if modification needed
	// Can return an error to stop processing
	Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

	// Name returns the processor name for logging and debugging
	Name() string

	// Order returns the processor order in the chain (lower = earlier)
	Order() int
}

MessageProcessor defines the interface for processing messages before sending

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

func NewMessageQueue

func NewMessageQueue(logger *slog.Logger, maxSize, dlqSize, workers int) *MessageQueue

func (*MessageQueue) AddToDLQ

func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)

AddToDLQ stores a failed message to the Dead Letter Queue

func (*MessageQueue) DLQLen

func (q *MessageQueue) DLQLen() int

DLQLen returns the number of messages in the Dead Letter Queue

func (*MessageQueue) Dequeue

func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)

func (*MessageQueue) Enqueue

func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error

func (*MessageQueue) GetDLQ

func (q *MessageQueue) GetDLQ() []*QueuedMessage

GetDLQ returns all messages in the Dead Letter Queue

func (*MessageQueue) Requeue

func (q *MessageQueue) Requeue(msg *QueuedMessage) error

Requeue adds a message back to the queue for retry

func (*MessageQueue) Size

func (q *MessageQueue) Size() int

func (*MessageQueue) Start

func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), sendFn func(context.Context, string, string, *ChatMessage) error)

func (*MessageQueue) Stop

func (q *MessageQueue) Stop()

type ParseMode

type ParseMode = base.ParseMode

type PendingInteraction added in v0.12.0

type PendingInteraction struct {
	ID           string
	SessionID    string
	ChannelID    string
	MessageTS    string
	ActionID     string
	UserID       string
	CallbackData string
	Callback     InteractionCallback
	CreatedAt    time.Time
	ExpiresAt    time.Time
	Type         InteractionType
	ThreadTS     string
	Status       InteractionStatus
	Response     *InteractionResponse
	Metadata     map[string]any
}

PendingInteraction represents a pending interactive action (e.g., button click).

func CreatePendingInteraction added in v0.12.0

func CreatePendingInteraction(
	sessionID string,
	userID string,
	channelID string,
	interactionType InteractionType,
	metadata map[string]any,
	ttl time.Duration,
) *PendingInteraction

CreatePendingInteraction creates a new PendingInteraction with default values.

func (*PendingInteraction) IsExpired added in v0.12.0

func (p *PendingInteraction) IsExpired() bool

IsExpired returns true if the interaction has expired.

func (*PendingInteraction) TimeUntilExpiry added in v0.12.0

func (p *PendingInteraction) TimeUntilExpiry() time.Duration

TimeUntilExpiry returns the duration until the interaction expires.

type PermissionConfig added in v0.17.0

type PermissionConfig struct {
	DMPolicy              string   `yaml:"dm_policy"`
	GroupPolicy           string   `yaml:"group_policy"`
	BotUserID             string   `yaml:"bot_user_id"`
	AllowedUsers          []string `yaml:"allowed_users"`
	BlockedUsers          []string `yaml:"blocked_users"`
	SlashCommandRateLimit float64  `yaml:"slash_command_rate_limit"`
}

type PlatformConfig

type PlatformConfig struct {
	Platform         string                  `yaml:"platform"`
	SystemPrompt     string                  `yaml:"system_prompt"`
	TaskInstructions string                  `yaml:"task_instructions"`
	Engine           EngineConfig            `yaml:"engine"`
	Provider         provider.ProviderConfig `yaml:"provider"`
	Security         SecurityConfig          `yaml:"security"`
	DingTalk         DingTalkConfig          `yaml:"dingtalk"`
	WhatsApp         WhatsAppConfig          `yaml:"whatsapp"`
	Options          map[string]any          `yaml:"options,omitempty"`
	SourceFile       string                  `yaml:"-"` // Tracks which file this config was loaded from
}

type ProcessorChain added in v0.12.0

type ProcessorChain struct {
	// contains filtered or unexported fields
}

ProcessorChain executes a chain of message processors

func NewDefaultProcessorChain added in v0.12.0

func NewDefaultProcessorChain(ctx context.Context, logger *slog.Logger) *ProcessorChain

NewDefaultProcessorChain creates a default processor chain with all standard processors

func NewProcessorChain added in v0.12.0

func NewProcessorChain(processors ...MessageProcessor) *ProcessorChain

NewProcessorChain creates a new processor chain with the given processors Processors are automatically sorted by Order()

func (*ProcessorChain) AddProcessor added in v0.12.0

func (c *ProcessorChain) AddProcessor(processor MessageProcessor)

AddProcessor adds a processor to the chain and re-sorts Note: This method is thread-safe but should preferably be called during initialization

func (*ProcessorChain) Close added in v0.17.0

func (c *ProcessorChain) Close()

Close stops all processors that have background goroutines

func (*ProcessorChain) Name added in v0.16.0

func (c *ProcessorChain) Name() string

Name returns the processor chain name

func (*ProcessorChain) Order added in v0.16.0

func (c *ProcessorChain) Order() int

Order returns the processor chain order

func (*ProcessorChain) Process added in v0.12.0

func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process executes all processors in order

func (*ProcessorChain) ResetSession added in v0.15.1

func (c *ProcessorChain) ResetSession(platform, sessionID string)

ResetSession propagate session reset to all processors that support it

type ProcessorOrder added in v0.12.0

type ProcessorOrder int

ProcessorOrder defines standard processor ordering

const (
	// OrderFilter drops noise events before anything else
	OrderFilter ProcessorOrder = 10
	// OrderThread manages thread_ts caching for message chunking
	OrderThread ProcessorOrder = 15
	// OrderFormatConversion converts markdown to platform-specific format
	OrderFormatConversion ProcessorOrder = 40
	// OrderChunk splits long messages for Slack API limits
	OrderChunk ProcessorOrder = 50
)

type QueueError

type QueueError struct {
	Message string
}

func (*QueueError) Error

func (e *QueueError) Error() string

type QueuedMessage

type QueuedMessage struct {
	Platform  string
	SessionID string
	Message   *ChatMessage
	Retries   int
	CreatedAt time.Time
}

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

func NewRateLimiter

func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context) error

type RetryConfig

type RetryConfig struct {
	MaxAttempts int
	BaseDelay   time.Duration
	MaxDelay    time.Duration
}

type RichContent

type RichContent = base.RichContent

type SecurityConfig added in v0.17.0

type SecurityConfig struct {
	VerifySignature bool             `yaml:"verify_signature"`
	Permission      PermissionConfig `yaml:"permission"`
}

type Session added in v0.15.5

type Session interface {
	ID() string
	Status() string
	CreatedAt() time.Time
	IsResumed() bool
}

Session abstracts session state and operations

type SessionOperations added in v0.15.5

type SessionOperations = base.SessionOperations

Re-export interfaces from base for convenience

type SessionStats added in v0.15.5

type SessionStats struct {
	SessionID     string
	Status        string
	TotalTokens   int64
	InputTokens   int64
	OutputTokens  int64
	CacheRead     int64
	CacheWrite    int64
	TotalCost     float64
	Duration      time.Duration
	ToolCallCount int
	ErrorCount    int
}

SessionStats holds session statistics

type SlackBlock

type SlackBlock map[string]any

type StreamAdapter

type StreamAdapter interface {
	ChatAdapter
	SendStreamMessage(ctx context.Context, sessionID string, msg *ChatMessage) (StreamHandler, error)
	UpdateMessage(ctx context.Context, sessionID, messageID string, msg *ChatMessage) error
}

type StreamCallback

type StreamCallback struct {
	// contains filtered or unexported fields
}

StreamCallback implements event.Callback to receive Engine events and forward to ChatApp

func NewStreamCallback

func NewStreamCallback(
	ctx context.Context,
	sessionID, platform string,
	adapters *AdapterManager,
	logger *slog.Logger,
	isHot bool,
	metadata map[string]any,
	messageOps MessageOperations,
	sessionOps SessionOperations,
) *StreamCallback

NewStreamCallback creates a new StreamCallback with injected platform-specific operations

func (*StreamCallback) Close added in v0.17.0

func (c *StreamCallback) Close()

Close releases resources held by the callback

func (*StreamCallback) Handle

func (c *StreamCallback) Handle(eventType string, data any) error

Handle implements event.Callback

type StreamHandler

type StreamHandler func(ctx context.Context, sessionID string, chunk string, isFinal bool) error

type ThreadInfo added in v0.12.0

type ThreadInfo struct {
	ThreadTS     string
	ChannelID    string
	LastActivity time.Time
}

ThreadInfo holds thread-related metadata for a session.

type ThreadProcessor added in v0.12.0

type ThreadProcessor struct {
	// contains filtered or unexported fields
}

ThreadProcessor manages thread_ts caching for message chunking. It tracks the first message's timestamp for each session to associate subsequent chunked messages in the same thread.

func NewThreadProcessor added in v0.12.0

func NewThreadProcessor(logger *slog.Logger, opts ThreadProcessorOptions) *ThreadProcessor

NewThreadProcessor creates a new ThreadProcessor.

func (*ThreadProcessor) Delete added in v0.12.0

func (p *ThreadProcessor) Delete(sessionID string)

Delete removes thread info for a session.

func (*ThreadProcessor) GetThreadTS added in v0.12.0

func (p *ThreadProcessor) GetThreadTS(sessionID string) string

GetThreadTS returns the stored thread_ts for a session. Returns empty string if no thread info exists.

func (*ThreadProcessor) Name added in v0.12.0

func (p *ThreadProcessor) Name() string

Name returns the processor name.

func (*ThreadProcessor) Order added in v0.12.0

func (p *ThreadProcessor) Order() int

Order returns the processor order (runs after rate limit, before aggregation).

func (*ThreadProcessor) Process added in v0.12.0

Process manages thread_ts for the message. For the first message: stores thread_ts from metadata if present. For subsequent messages: attaches stored thread_ts to metadata.

func (*ThreadProcessor) SetThreadTS added in v0.12.0

func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)

SetThreadTS explicitly sets the thread_ts for a session. This is useful when the first message response provides the thread_ts.

func (*ThreadProcessor) Stop added in v0.12.0

func (p *ThreadProcessor) Stop()

Stop stops the cleanup goroutine.

type ThreadProcessorOptions added in v0.12.0

type ThreadProcessorOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 5 min)
	TTL             time.Duration // How long to keep thread info (default: 30 min)
}

ThreadProcessorOptions configures the ThreadProcessor.

type WhatsAppConfig

type WhatsAppConfig struct {
	PhoneNumberID string `yaml:"phone_number_id"`
	AccessToken   string `yaml:"access_token"`
	VerifyToken   string `yaml:"verify_token"`
	APIVersion    string `yaml:"api_version"`
}

Directories

Path Synopsis
Package slack provides a high-performance, AI-native Slack adapter for the HotPlex engine.
Package slack provides a high-performance, AI-native Slack adapter for the HotPlex engine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL