chatapps

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 32 Imported by: 0

README ΒΆ

ChatApps: Multi-Platform Connector

The chatapps package provides the bridge between HotPlex's core engine and various chat platforms (Slack, Feishu/Lark, Telegram, Discord, etc.). It normalizes platform-specific events and messages into a unified format for the engine to process.

πŸ› Architecture Overview

HotPlex uses an Adapter-based Pipeline architecture to achieve platform neutrality and consistent behavior across different IM apps.

πŸ”„ End-to-End Bidirectional Flow

From the client terminal's perspective, HotPlex operates as a reactive streaming system.

sequenceDiagram
    participant User as πŸ‘€ User Terminal
    participant Platform as ☁️ Chat Platform (Slack/Feishu)
    participant Adapter as πŸ”Œ Platform Adapter
    participant Pipeline as ⛓️ Processor Pipeline
    participant Handler as 🧠 Engine Handler
    participant Engine as πŸ”₯ HotPlex Engine

    Note over User, Engine: 🟒 Phase 1: Request (C2S)
    User->>Platform: Sends message/command
    Platform->>Adapter: Webhook Event (JSON)
    Adapter->>Adapter: Parse & Normalize to base.ChatMessage
    Adapter->>Handler: Handle(ctx, msg)
    Handler->>Engine: Execute(ctx, prompt)

    Note over User, Engine: πŸ”΅ Phase 2: Response Streaming (S2C)
    Engine-->>Handler: Stream Token/Event
    Handler->>Pipeline: Process(msg)
    Pipeline->>Pipeline: Filter -> RateLimit -> Format -> Chunk
    Pipeline->>Adapter: SendMessage/UpdateMessage
    Adapter->>Platform: Platform API Call (e.g., chat.update)
    Platform-->>User: Visual Update (Real-time)
    
    Note over User, Engine: βšͺ Phase 3: Finalization
    Engine-->>Handler: Execution Complete
    Handler->>Adapter: Send Session Stats/UI Controls
    Adapter->>Platform: Final Message Render
    Platform-->>User: Session Result
Data Normalization (Event to Message Mapping)

The chatapps layer normalizes raw provider events into a standard "Chat Language" using the base.MessageType Go type. While the underlying values often share names with Engine events, they represent UI Rendering Intents documented in base/types.go.

Provider/Engine Event base.MessageType Constant UI Presentation
thinking MessageTypeThinking Status indicators / Thinking bubbles
tool_use MessageTypeToolUse Tool invocation block (e.g., "Running ls...")
tool_result MessageTypeToolResult Collapsible tool output block
answer MessageTypeAnswer Standard Markdown text message
error MessageTypeError Red-themed alert block
plan_mode MessageTypePlanMode Planning phase indicator (Claude Code)
permission_request MessageTypePermissionRequest Interactive Allow/Deny buttons
session_stats MessageTypeSessionStats Usage summary (Tokens, Cost, Duration)
danger_block MessageTypeDangerBlock Critical warning with confirmation
Key Architectural Concepts
  • ProcessorChain: A middleware-style pipeline that processes messages before they are sent or after they are received. Standard processors include:
    • Filter: Drops noise/bot events.
    • ZoneOrder: Ensures UI blocks (Thinking -> Action -> Answer) are displayed in the correct order.
    • RateLimit: Protects against message bursts.
    • FormatConversion: Converts Standard Markdown to platform-specific formats (e.g., Slack Block Kit, Feishu Card).
    • Chunking: Splits long messages to respect platform API limits.
  • EngineMessageHandler: The main business logic that connects normalized chat events to the HotPlex Engine.

πŸ›  Developer Guide

1. Implementing a New Platform Adapter

To add a platform (e.g., whatsapp), create a new package chatapps/whatsapp and follow these patterns:

Phase A: Core Interface (base.ChatAdapter)

The foundation of any adapter is the ChatAdapter interface.

type WhatsAppAdapter struct {
    config  Config
    handler base.MessageHandler // Injected via SetHandler
    logger  *slog.Logger
}

func (a *WhatsAppAdapter) Platform() string { return "whatsapp" }

// Handle incoming platform events (e.g., from webhook)
func (a *WhatsAppAdapter) HandleMessage(ctx context.Context, msg *base.ChatMessage) error {
    if a.handler != nil {
        return a.handler(ctx, msg) // Delegate to Engine
    }
    return nil
}

// Send outgoing formatted messages
func (a *WhatsAppAdapter) SendMessage(ctx context.Context, sessionID string, msg *base.ChatMessage) error {
    // 1. Convert msg (Markdown/Blocks) to WhatsApp format
    // 2. Call WhatsApp Cloud API
}

func (a *WhatsAppAdapter) SetHandler(h base.MessageHandler) { a.handler = h }
Phase B: Webhook Support (base.WebhookProvider)

If the platform uses webhooks, implement this to register routes automatically under /webhook/myplatform/.

func (a *WhatsAppAdapter) WebhookHandler() http.Handler {
    mux := http.NewServeMux()
    mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
        // 1. Parse signature & event
        // 2. Convert to base.ChatMessage
        // 3. Call a.HandleMessage(r.Context(), normalizedMsg)
    })
    return mux
}
Phase C: Advanced UI & Resiliency

Implement these optional interfaces to provide a premium experience:

  • base.StatusProvider: Handles "Thinking..." or "Running tool X..." visual indicators.
  • base.MessageOperations: Supports updating/deleting existing messages (critical for streaming and UI updates).
  • base.StreamWriter: A standard io.Writer interface for real-time token streaming.
2. Message Processor Pipeline

The ProcessorChain is a middleware system. You can add global or platform-specific processors.

// Example: Custom Privacy Masking Processor
type PrivacyMasker struct {}
func (p *PrivacyMasker) Name() string { return "privacy_mask" }
func (p *PrivacyMasker) Order() int { return 12 } // Run between RateLimit and Aggregation

func (p *PrivacyMasker) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error) {
    msg.Content = strings.ReplaceAll(msg.Content, "SECRET", "****")
    return msg, nil
}
3. Integration into setup.go

Once implemented, register your adapter in chatapps/setup.go:

// 1. Add to setupPlatform calls
setupPlatform(ctx, "whatsapp", loader, manager, logger, func(pc *PlatformConfig) ChatAdapter {
    // return whatsapp.NewAdapter(...)
})

// 2. Ensure credentials are in .env or config.yaml

πŸ— Interaction & Button Handling

For platforms supporting buttons (Slack Blocks, Feishu Cards), use the InteractionManager:

  1. Define Action IDs: Use a structured format {action_name}:{session_id} in buttons.
  2. Handle Callbacks: The adapter should capture button clicks and route them to InteractionManager.HandleAction.
  3. UI Feedback: Use StatusProvider to show "Processing..." while the action executes.

πŸ“Š Observability & Metrics

HOTPLEX_CHATAPPS_CONFIG_DIR: Path to platform-specific YAML configs.


Status: Active / Modular
Maintainer: HotPlex Core Team

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const (
	ZoneInitialization = 0 // Initialization Zone (εˆε§‹εŒ–) - session_start, engine_starting
	ZoneThinking       = 1 // Thinking Zone (ζ€θ€ƒεŒΊ) - thinking, plan_mode
	ZoneAction         = 2 // Action Zone (葌动区) - tool_use, tool_result, etc.
	ZoneOutput         = 3 // Output Zone (ε±•η€ΊεŒΊ) - answer, ask_user_question
	ZoneSummary        = 4 // Summary Zone (ζ€»η»“εŒΊ) - session_stats
)

Zone indices – fixed ordering for message areas.

View Source
const (
	ParseModeNone     = base.ParseModeNone
	ParseModeMarkdown = base.ParseModeMarkdown
	ParseModeHTML     = base.ParseModeHTML
)

Variables ΒΆ

View Source
var (
	// MessagesAggregatedTotal counts total messages aggregated
	MessagesAggregatedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_aggregated_total",
			Help: "Total number of messages that have been aggregated",
		},
		[]string{"event_type", "platform"},
	)

	// MessagesFlushedTotal counts total flushes by reason
	MessagesFlushedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_flushed_total",
			Help: "Total number of times the buffer was flushed",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// MessagesDroppedTotal counts dropped messages
	MessagesDroppedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_dropped_total",
			Help: "Total number of messages dropped due to buffer overflow",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// BufferSizeGauge tracks current buffer size per platform
	BufferSizeGauge = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "hotplex_aggregator_buffer_size",
			Help: "Current number of messages in the buffer per platform",
		},
		[]string{"platform"},
	)

	// BufferDurationHistogram tracks time from first message to flush
	BufferDurationHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_buffer_duration_seconds",
			Help:    "Time in seconds from first message arrival to buffer flush",
			Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0},
		},
		[]string{"platform"},
	)

	// MessageSizeHistogram tracks message size distribution
	MessageSizeHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_message_size_bytes",
			Help:    "Size distribution of aggregated messages in bytes",
			Buckets: []float64{64, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768},
		},
		[]string{"event_type", "platform"},
	)

	// RateLimitDroppedTotal counts messages dropped by non-blocking rate limiter
	RateLimitDroppedTotal = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "hotplex_ratelimit_messages_dropped_total",
			Help: "Total number of messages dropped by the non-blocking rate limiter",
		},
	)
)
View Source
var ErrQueueFull = &QueueError{Message: "queue is full"}

Functions ΒΆ

func GenerateInteractionID ΒΆ added in v0.12.0

func GenerateInteractionID(sessionID string, timestamp time.Time) string

GenerateInteractionID generates a unique interaction ID.

func GetExtraChunks ΒΆ added in v0.12.0

func GetExtraChunks(msg *base.ChatMessage) []string

GetExtraChunks returns any extra chunks stored in message metadata. Returns nil if no extra chunks exist.

func IsRetryableError ΒΆ

func IsRetryableError(err error) bool

IsRetryableError classifies errors as retryable or non-retryable

func RetryWithBackoff ΒΆ

func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error

Types ΒΆ

type AdapterManager ΒΆ

type AdapterManager struct {
	// contains filtered or unexported fields
}

func NewAdapterManager ΒΆ

func NewAdapterManager(logger *slog.Logger) *AdapterManager

func Setup ΒΆ added in v0.11.0

func Setup(ctx context.Context, logger *slog.Logger, configDir ...string) (http.Handler, *AdapterManager, error)

Setup initializes all enabled ChatApps and their dedicated Engines. It returns an http.Handler that handles all webhook routes. The configDir parameter takes priority over HOTPLEX_CHATAPPS_CONFIG_DIR environment variable.

func (*AdapterManager) GetAdapter ΒΆ

func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)

func (*AdapterManager) GetMessageOperations ΒΆ added in v0.15.5

func (m *AdapterManager) GetMessageOperations(platform string) MessageOperations

GetMessageOperations returns platform-specific message operations interface Returns nil if the platform doesn't support message operations

func (*AdapterManager) GetSessionOperations ΒΆ added in v0.15.5

func (m *AdapterManager) GetSessionOperations(platform string) SessionOperations

GetSessionOperations returns platform-specific session operations interface Returns nil if the platform doesn't support session operations

func (*AdapterManager) GetStatusProvider ΒΆ added in v0.18.0

func (m *AdapterManager) GetStatusProvider(platform string) base.StatusProvider

GetStatusProvider returns platform-specific status provider interface Returns nil if the platform doesn't support status operations

func (*AdapterManager) Handler ΒΆ added in v0.11.0

func (m *AdapterManager) Handler() http.Handler

Handler returns an http.Handler with all adapter webhooks mounted This is a convenience method when you don't need gorilla/mux

func (*AdapterManager) ListPlatforms ΒΆ

func (m *AdapterManager) ListPlatforms() []string

func (*AdapterManager) NewStreamWriter ΒΆ added in v0.18.0

func (m *AdapterManager) NewStreamWriter(ctx context.Context, platform, channelID, threadTS string) base.StreamWriter

NewStreamWriter creates a platform-agnostic streaming writer for the given platform Returns nil if the platform doesn't support streaming or adapter not found

func (*AdapterManager) Register ΒΆ

func (m *AdapterManager) Register(adapter ChatAdapter) error

func (*AdapterManager) RegisterEngine ΒΆ added in v0.11.0

func (m *AdapterManager) RegisterEngine(eng *engine.Engine)

func (*AdapterManager) RegisterRoutes ΒΆ added in v0.11.0

func (m *AdapterManager) RegisterRoutes(router *mux.Router)

RegisterRoutes registers all adapter webhooks to a unified router Path format: /webhook/{platform} (e.g., /webhook/telegram, /webhook/discord)

func (*AdapterManager) SendMessage ΒΆ

func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error

SendMessage sends a message to a specific platform

func (*AdapterManager) StartAll ΒΆ

func (m *AdapterManager) StartAll(ctx context.Context) error

func (*AdapterManager) StopAll ΒΆ

func (m *AdapterManager) StopAll() error

func (*AdapterManager) Unregister ΒΆ

func (m *AdapterManager) Unregister(platform string) error

type AdapterMetrics ΒΆ

type AdapterMetrics struct {
	MessagesReceived atomic.Int64
	MessagesSent     atomic.Int64
	MessagesFailed   atomic.Int64
	LastMessageAt    atomic.Int64
	Uptime           atomic.Int64
}

func NewAdapterMetrics ΒΆ

func NewAdapterMetrics() *AdapterMetrics

func (*AdapterMetrics) GetStats ΒΆ

func (m *AdapterMetrics) GetStats() map[string]int64

func (*AdapterMetrics) RecordFailure ΒΆ

func (m *AdapterMetrics) RecordFailure()

func (*AdapterMetrics) RecordReceive ΒΆ

func (m *AdapterMetrics) RecordReceive()

func (*AdapterMetrics) RecordSend ΒΆ

func (m *AdapterMetrics) RecordSend()

type AggregatedMessageSender ΒΆ added in v0.12.0

type AggregatedMessageSender interface {
	SendAggregatedMessage(ctx context.Context, msg *base.ChatMessage) error
}

AggregatedMessageSender sends aggregated messages

type Attachment ΒΆ

type Attachment = base.Attachment

type ChatAdapter ΒΆ

type ChatAdapter = base.ChatAdapter

type ChatMessage ΒΆ

type ChatMessage = base.ChatMessage

func NewChatMessage ΒΆ added in v0.11.0

func NewChatMessage(platform, sessionID, userID, content string) *ChatMessage

type ChunkInfo ΒΆ added in v0.12.0

type ChunkInfo struct {
	TotalChunks  int
	CurrentChunk int
	ThreadTS     string
	ChannelID    string
}

ChunkInfo holds metadata about chunked messages.

func GetChunkInfo ΒΆ added in v0.12.0

func GetChunkInfo(msg *base.ChatMessage) *ChunkInfo

GetChunkInfo returns the ChunkInfo from message metadata.

type ChunkProcessor ΒΆ added in v0.12.0

type ChunkProcessor struct {
	// contains filtered or unexported fields
}

ChunkProcessor splits long messages into chunks that fit within platform limits. It uses the Slack chunker for Markdown-aware splitting.

func NewChunkProcessor ΒΆ added in v0.12.0

func NewChunkProcessor(logger *slog.Logger, opts ChunkProcessorOptions) *ChunkProcessor

NewChunkProcessor creates a new ChunkProcessor.

func (*ChunkProcessor) Name ΒΆ added in v0.12.0

func (p *ChunkProcessor) Name() string

Name returns the processor name.

func (*ChunkProcessor) Order ΒΆ added in v0.12.0

func (p *ChunkProcessor) Order() int

Order returns the processor order (runs after format conversion).

func (*ChunkProcessor) Process ΒΆ added in v0.12.0

func (p *ChunkProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process splits the message into chunks if it exceeds the character limit. Returns either a single message or a slice of messages.

type ChunkProcessorOptions ΒΆ added in v0.12.0

type ChunkProcessorOptions struct {
	MaxChars int // Maximum characters per chunk (default: 4000 for Slack)
}

ChunkProcessorOptions configures the ChunkProcessor.

type CommandCallback ΒΆ added in v0.13.0

type CommandCallback struct {
	// contains filtered or unexported fields
}

CommandCallback handles slash command progress events

func NewCommandCallback ΒΆ added in v0.13.0

func NewCommandCallback(ctx context.Context, platform, sessionID string, adapters *AdapterManager, logger *slog.Logger, metadata map[string]any) *CommandCallback

NewCommandCallback creates a new command callback

func (*CommandCallback) Handle ΒΆ added in v0.13.0

func (c *CommandCallback) Handle(eventType string, data any) error

Handle implements event.Callback interface

type ConfigLoader ΒΆ

type ConfigLoader struct {
	// contains filtered or unexported fields
}

func NewConfigLoader ΒΆ

func NewConfigLoader(configDir string, logger *slog.Logger) (*ConfigLoader, error)

func (*ConfigLoader) Close ΒΆ added in v0.11.0

func (c *ConfigLoader) Close() error

Close stops all hot reload watchers and releases resources.

func (*ConfigLoader) GetConfig ΒΆ

func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig

func (*ConfigLoader) GetOptions ΒΆ added in v0.11.0

func (c *ConfigLoader) GetOptions(platform string) map[string]any

func (*ConfigLoader) GetSystemPrompt ΒΆ

func (c *ConfigLoader) GetSystemPrompt(platform string) string

func (*ConfigLoader) GetTaskInstructions ΒΆ

func (c *ConfigLoader) GetTaskInstructions(platform string) string

func (*ConfigLoader) HasPlatform ΒΆ

func (c *ConfigLoader) HasPlatform(platform string) bool

func (*ConfigLoader) Load ΒΆ

func (c *ConfigLoader) Load(configDir string) error

func (*ConfigLoader) Platforms ΒΆ

func (c *ConfigLoader) Platforms() []string

func (*ConfigLoader) StartHotReload ΒΆ added in v0.11.0

func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, onReload func(platform string, cfg *PlatformConfig)) error

StartHotReload starts watching all config files for changes and automatically reloads them. The onReload callback is called with the updated PlatformConfig for each platform.

type DingTalkConfig ΒΆ

type DingTalkConfig struct {
	AppID         string `yaml:"app_id"`
	AppSecret     string `yaml:"app_secret"`
	CallbackToken string `yaml:"callback_token"`
	CallbackKey   string `yaml:"callback_key"`
	MaxMessageLen int    `yaml:"max_message_len"`
}

type DiscordEmbed ΒΆ

type DiscordEmbed struct {
	Title       string                 `json:"title,omitempty"`
	Description string                 `json:"description,omitempty"`
	URL         string                 `json:"url,omitempty"`
	Color       int                    `json:"color,omitempty"`
	Fields      []DiscordEmbedField    `json:"fields,omitempty"`
	Footer      *DiscordEmbedFooter    `json:"footer,omitempty"`
	Thumbnail   *DiscordEmbedThumbnail `json:"thumbnail,omitempty"`
	Image       *DiscordEmbedImage     `json:"image,omitempty"`
	Timestamp   string                 `json:"timestamp,omitempty"`
}

type DiscordEmbedField ΒΆ

type DiscordEmbedField struct {
	Name   string `json:"name"`
	Value  string `json:"value"`
	Inline bool   `json:"inline,omitempty"`
}

type DiscordEmbedFooter ΒΆ

type DiscordEmbedFooter struct {
	Text    string `json:"text"`
	IconURL string `json:"icon_url,omitempty"`
}

type DiscordEmbedImage ΒΆ

type DiscordEmbedImage struct {
	URL string `json:"url"`
}

type DiscordEmbedThumbnail ΒΆ

type DiscordEmbedThumbnail struct {
	URL string `json:"url"`
}

type Engine ΒΆ added in v0.15.5

type Engine interface {
	Execute(ctx context.Context, cfg *types.Config, prompt string, callback event.Callback) error
	CheckDanger(prompt string) (blocked bool, operation, reason string)
	GetSession(sessionID string) (Session, bool)
	Close() error
	GetSessionStats(sessionID string) *SessionStats
	ValidateConfig(cfg *types.Config) error
	StopSession(sessionID string, reason string) error
	ResetSessionProvider(sessionID string)
	SetDangerAllowPaths(paths []string)
	SetDangerBypassEnabled(token string, enabled bool) error
	SetAllowedTools(tools []string)
	SetDisallowedTools(tools []string)
	GetAllowedTools() []string
	GetDisallowedTools() []string
}

Engine abstracts the engine functionality for dependency inversion

type EngineConfig ΒΆ added in v0.11.0

type EngineConfig struct {
	Timeout         time.Duration `yaml:"timeout"`
	IdleTimeout     time.Duration `yaml:"idle_timeout"`
	WorkDir         string        `yaml:"work_dir"`
	AllowedTools    []string      `yaml:"allowed_tools"`
	DisallowedTools []string      `yaml:"disallowed_tools"`
}

type EngineHolder ΒΆ

type EngineHolder struct {
	// contains filtered or unexported fields
}

EngineHolder holds the Engine instance and configuration for ChatApps integration

func NewEngineHolder ΒΆ

func NewEngineHolder(opts EngineHolderOptions) (*EngineHolder, error)

NewEngineHolder creates a new EngineHolder with the given options

func (*EngineHolder) GetAdapterManager ΒΆ

func (h *EngineHolder) GetAdapterManager() *AdapterManager

GetAdapterManager returns the AdapterManager for sending messages

func (*EngineHolder) GetEngine ΒΆ

func (h *EngineHolder) GetEngine() Engine

GetEngine returns the underlying Engine instance

type EngineHolderOptions ΒΆ

type EngineHolderOptions struct {
	Logger           *slog.Logger
	Adapters         *AdapterManager
	Timeout          time.Duration
	IdleTimeout      time.Duration
	Namespace        string
	PermissionMode   string
	AllowedTools     []string
	DisallowedTools  []string
	DefaultWorkDir   string
	DefaultTaskInstr string
}

EngineHolderOptions configures the EngineHolder

type EngineMessageHandler ΒΆ

type EngineMessageHandler struct {
	// contains filtered or unexported fields
}

EngineMessageHandler implements MessageHandler and integrates with Engine

func NewEngineMessageHandler ΒΆ

func NewEngineMessageHandler(engine Engine, adapters *AdapterManager, opts ...EngineMessageHandlerOption) *EngineMessageHandler

NewEngineMessageHandler creates a new EngineMessageHandler

func (*EngineMessageHandler) Close ΒΆ added in v0.17.0

func (h *EngineMessageHandler) Close()

Close releases resources held by the handler

func (*EngineMessageHandler) Handle ΒΆ

func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) error

Handle implements EngineMessageHandler

type EngineMessageHandlerOption ΒΆ

type EngineMessageHandlerOption func(*EngineMessageHandler)

EngineMessageHandlerOption configures the EngineMessageHandler

func WithConfigLoader ΒΆ

func WithConfigLoader(loader *ConfigLoader) EngineMessageHandlerOption

func WithLogger ΒΆ

func WithLogger(logger *slog.Logger) EngineMessageHandlerOption

func WithTaskInstrFn ΒΆ

func WithTaskInstrFn(fn func(sessionID string) string) EngineMessageHandlerOption

func WithWorkDirFn ΒΆ

func WithWorkDirFn(fn func(sessionID string) string) EngineMessageHandlerOption

type EventConfig ΒΆ added in v0.12.0

type EventConfig struct {
	Aggregate    bool // Whether to aggregate messages of this type
	SameTypeOnly bool // Only aggregate with same event type
	Immediate    bool // Send immediately, skip aggregation
	UseUpdate    bool // Use chat.update for streaming updates
	MinContent   int  // Minimum content length to skip aggregation (0 = use global default)
}

EventConfig defines aggregation behavior for specific event types

type FormatConversionProcessor ΒΆ added in v0.12.0

type FormatConversionProcessor struct {
	// contains filtered or unexported fields
}

FormatConversionProcessor converts message content to platform-specific formats

func NewFormatConversionProcessor ΒΆ added in v0.12.0

func NewFormatConversionProcessor(logger *slog.Logger) *FormatConversionProcessor

NewFormatConversionProcessor creates a new FormatConversionProcessor

func (*FormatConversionProcessor) Name ΒΆ added in v0.12.0

Name returns the processor name

func (*FormatConversionProcessor) Order ΒΆ added in v0.12.0

func (p *FormatConversionProcessor) Order() int

Order returns the processor order

func (*FormatConversionProcessor) Process ΒΆ added in v0.12.0

Process converts message content based on platform

type HealthCheck ΒΆ

type HealthCheck struct {
	Name      string
	Status    string
	LastCheck time.Time
	LastError string
	Interval  time.Duration
	CheckFunc func() error
}

type HealthChecker ΒΆ

type HealthChecker struct {
	// contains filtered or unexported fields
}

func NewHealthChecker ΒΆ

func NewHealthChecker(interval time.Duration) *HealthChecker

func (*HealthChecker) GetStatus ΒΆ

func (h *HealthChecker) GetStatus() map[string]HealthCheck

func (*HealthChecker) Register ΒΆ

func (h *HealthChecker) Register(check HealthCheck)

func (*HealthChecker) Start ΒΆ

func (h *HealthChecker) Start()

func (*HealthChecker) Stop ΒΆ

func (h *HealthChecker) Stop()

type InlineKeyboardButton ΒΆ

type InlineKeyboardButton struct {
	Text         string `json:"text"`
	URL          string `json:"url,omitempty"`
	CallbackData string `json:"callback_data,omitempty"`
}

type InlineKeyboardMarkup ΒΆ

type InlineKeyboardMarkup struct {
	InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}

type InteractionCallback ΒΆ added in v0.12.0

type InteractionCallback func(interaction *PendingInteraction) error

InteractionCallback is a function that handles an interaction callback.

type InteractionManager ΒΆ added in v0.12.0

type InteractionManager struct {
	// contains filtered or unexported fields
}

InteractionManager manages pending interactions for Slack interactive components.

func NewInteractionManager ΒΆ added in v0.12.0

func NewInteractionManager(logger *slog.Logger, opts InteractionManagerOptions) *InteractionManager

NewInteractionManager creates a new InteractionManager.

func (*InteractionManager) Complete ΒΆ added in v0.12.0

func (m *InteractionManager) Complete(id string, response *InteractionResponse) error

Complete marks an interaction as completed with a response.

func (*InteractionManager) Count ΒΆ added in v0.12.0

func (m *InteractionManager) Count() int

Count returns the number of pending interactions.

func (*InteractionManager) Delete ΒΆ added in v0.12.0

func (m *InteractionManager) Delete(id string)

Delete removes a pending interaction.

func (*InteractionManager) Expire ΒΆ added in v0.12.0

func (m *InteractionManager) Expire(id string) error

Expire marks an interaction as expired.

func (*InteractionManager) Get ΒΆ added in v0.12.0

Get retrieves a pending interaction by ID. Returns nil if not found or expired.

func (*InteractionManager) GetBySession ΒΆ added in v0.12.0

func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction

GetBySession retrieves all pending interactions for a session.

func (*InteractionManager) HandleCallback ΒΆ added in v0.12.0

func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error

HandleCallback processes an interaction callback. It looks up the interaction, calls the callback, and removes the interaction.

func (*InteractionManager) PendingCount ΒΆ added in v0.12.0

func (m *InteractionManager) PendingCount() int

PendingCount returns the number of pending (non-expired) interactions.

func (*InteractionManager) Stop ΒΆ added in v0.12.0

func (m *InteractionManager) Stop()

Stop stops the cleanup goroutine.

func (*InteractionManager) Store ΒΆ added in v0.12.0

func (m *InteractionManager) Store(interaction *PendingInteraction) string

Store adds a new pending interaction and returns its ID.

func (*InteractionManager) TotalCount ΒΆ added in v0.12.0

func (m *InteractionManager) TotalCount() int

TotalCount returns the total number of interactions (including expired).

type InteractionManagerOptions ΒΆ added in v0.12.0

type InteractionManagerOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 1 min)
	TTL             time.Duration // How long to keep pending interactions (default: 10 min)
}

InteractionManagerOptions configures the InteractionManager.

type InteractionResponse ΒΆ added in v0.12.0

type InteractionResponse struct {
	ActionID    string    `json:"action_id"`
	Value       string    `json:"value"`
	UserID      string    `json:"user_id"`
	RespondedAt time.Time `json:"responded_at"`
}

InteractionResponse represents the user's response to an interaction.

type InteractionStatus ΒΆ added in v0.12.0

type InteractionStatus string

InteractionStatus is the status of a pending interaction.

const (
	InteractionStatusPending   InteractionStatus = "pending"
	InteractionStatusCompleted InteractionStatus = "completed"
	InteractionStatusExpired   InteractionStatus = "expired"
	InteractionStatusCancelled InteractionStatus = "cancelled"
)

type InteractionType ΒΆ added in v0.12.0

type InteractionType string

InteractionType defines the type of interactive message.

const (
	// InteractionTypePermission is for Claude Code permission requests (Issue #39).
	InteractionTypePermission InteractionType = "permission"
	// InteractionTypeApproval is for general approval requests (Issue #37).
	InteractionTypeApproval InteractionType = "approval"
	// InteractionTypeSelection is for selection/choice requests.
	InteractionTypeSelection InteractionType = "selection"
)

type LifecycleManager ΒΆ

type LifecycleManager struct {
	// contains filtered or unexported fields
}

func NewLifecycleManager ΒΆ

func NewLifecycleManager() *LifecycleManager

func (*LifecycleManager) OnStart ΒΆ

func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)

func (*LifecycleManager) OnStop ΒΆ

func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)

func (*LifecycleManager) RegisterAdapter ΒΆ

func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)

func (*LifecycleManager) StartAll ΒΆ

func (m *LifecycleManager) StartAll(ctx context.Context) error

func (*LifecycleManager) StopAll ΒΆ

func (m *LifecycleManager) StopAll() error

type Logger ΒΆ

type Logger = slog.Logger

type MessageAggregatorProcessor ΒΆ added in v0.12.0

type MessageAggregatorProcessor struct {
	// contains filtered or unexported fields
}

MessageAggregatorProcessor aggregates multiple rapid messages into one

func NewMessageAggregatorProcessor ΒΆ added in v0.12.0

func NewMessageAggregatorProcessor(ctx context.Context, logger *slog.Logger, opts MessageAggregatorProcessorOptions) *MessageAggregatorProcessor

NewMessageAggregatorProcessor creates a new MessageAggregatorProcessor

func (*MessageAggregatorProcessor) Name ΒΆ added in v0.12.0

Name returns the processor name

func (*MessageAggregatorProcessor) Order ΒΆ added in v0.12.0

func (p *MessageAggregatorProcessor) Order() int

Order returns the processor order

func (*MessageAggregatorProcessor) Process ΒΆ added in v0.12.0

Process aggregates messages with event-type awareness

func (*MessageAggregatorProcessor) ResetSession ΒΆ added in v0.15.1

func (p *MessageAggregatorProcessor) ResetSession(platform, sessionID string)

ResetSession clears all buffers for a specific session.

func (*MessageAggregatorProcessor) SetSender ΒΆ added in v0.12.0

SetSender sets the sender for flushing aggregated messages

func (*MessageAggregatorProcessor) Stop ΒΆ added in v0.12.0

func (p *MessageAggregatorProcessor) Stop()

Stop stops the aggregator and cleans up buffers

type MessageAggregatorProcessorOptions ΒΆ added in v0.12.0

type MessageAggregatorProcessorOptions struct {
	Window     time.Duration // Time window to wait for more messages
	MinContent int           // Minimum characters before sending immediately
	MaxMsgs    int           // Maximum messages in buffer (default: maxBufferMsgs)
	MaxRunes   int           // Maximum total runes in buffer (default: maxBufferRunes)
}

MessageAggregatorProcessorOptions configures the aggregator

type MessageFilterProcessor ΒΆ added in v0.15.1

type MessageFilterProcessor struct {
	// contains filtered or unexported fields
}

MessageFilterProcessor drops noise events before they enter the rest of the chain.

func NewMessageFilterProcessor ΒΆ added in v0.15.1

func NewMessageFilterProcessor(logger *slog.Logger) *MessageFilterProcessor

NewMessageFilterProcessor creates a new MessageFilterProcessor.

func (*MessageFilterProcessor) Name ΒΆ added in v0.15.1

func (p *MessageFilterProcessor) Name() string

Name returns the processor name.

func (*MessageFilterProcessor) Order ΒΆ added in v0.15.1

func (p *MessageFilterProcessor) Order() int

Order returns the processor order – must run before all others.

func (*MessageFilterProcessor) Process ΒΆ added in v0.15.1

Process drops hidden events by returning nil.

type MessageHandler ΒΆ

type MessageHandler = base.MessageHandler

type MessageOperations ΒΆ added in v0.15.5

type MessageOperations = base.MessageOperations

Re-export interfaces from base for convenience

type MessageProcessor ΒΆ added in v0.12.0

type MessageProcessor interface {
	// Process processes a message and returns the processed message
	// Can return the same message pointer if no modification needed
	// Can return a new message pointer if modification needed
	// Can return an error to stop processing
	Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

	// Name returns the processor name for logging and debugging
	Name() string

	// Order returns the processor order in the chain (lower = earlier)
	Order() int
}

MessageProcessor defines the interface for processing messages before sending

type MessageQueue ΒΆ

type MessageQueue struct {
	// contains filtered or unexported fields
}

func NewMessageQueue ΒΆ

func NewMessageQueue(logger *slog.Logger, maxSize, dlqSize, workers int) *MessageQueue

func (*MessageQueue) AddToDLQ ΒΆ

func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)

AddToDLQ stores a failed message to the Dead Letter Queue

func (*MessageQueue) DLQLen ΒΆ

func (q *MessageQueue) DLQLen() int

DLQLen returns the number of messages in the Dead Letter Queue

func (*MessageQueue) Dequeue ΒΆ

func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)

func (*MessageQueue) Enqueue ΒΆ

func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error

func (*MessageQueue) GetDLQ ΒΆ

func (q *MessageQueue) GetDLQ() []*QueuedMessage

GetDLQ returns all messages in the Dead Letter Queue

func (*MessageQueue) Requeue ΒΆ

func (q *MessageQueue) Requeue(msg *QueuedMessage) error

Requeue adds a message back to the queue for retry

func (*MessageQueue) Size ΒΆ

func (q *MessageQueue) Size() int

func (*MessageQueue) Start ΒΆ

func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), sendFn func(context.Context, string, string, *ChatMessage) error)

func (*MessageQueue) Stop ΒΆ

func (q *MessageQueue) Stop()

type ParseMode ΒΆ

type ParseMode = base.ParseMode

type PendingInteraction ΒΆ added in v0.12.0

type PendingInteraction struct {
	ID           string
	SessionID    string
	ChannelID    string
	MessageTS    string
	ActionID     string
	UserID       string
	CallbackData string
	Callback     InteractionCallback
	CreatedAt    time.Time
	ExpiresAt    time.Time
	Type         InteractionType
	ThreadTS     string
	Status       InteractionStatus
	Response     *InteractionResponse
	Metadata     map[string]any
}

PendingInteraction represents a pending interactive action (e.g., button click).

func CreatePendingInteraction ΒΆ added in v0.12.0

func CreatePendingInteraction(
	sessionID string,
	userID string,
	channelID string,
	interactionType InteractionType,
	metadata map[string]any,
	ttl time.Duration,
) *PendingInteraction

CreatePendingInteraction creates a new PendingInteraction with default values.

func (*PendingInteraction) IsExpired ΒΆ added in v0.12.0

func (p *PendingInteraction) IsExpired() bool

IsExpired returns true if the interaction has expired.

func (*PendingInteraction) TimeUntilExpiry ΒΆ added in v0.12.0

func (p *PendingInteraction) TimeUntilExpiry() time.Duration

TimeUntilExpiry returns the duration until the interaction expires.

type PermissionConfig ΒΆ added in v0.17.0

type PermissionConfig struct {
	DMPolicy              string   `yaml:"dm_policy"`
	GroupPolicy           string   `yaml:"group_policy"`
	BotUserID             string   `yaml:"bot_user_id"`
	AllowedUsers          []string `yaml:"allowed_users"`
	BlockedUsers          []string `yaml:"blocked_users"`
	SlashCommandRateLimit float64  `yaml:"slash_command_rate_limit"`
}

type PlatformConfig ΒΆ

type PlatformConfig struct {
	Platform         string                  `yaml:"platform"`
	SystemPrompt     string                  `yaml:"system_prompt"`
	TaskInstructions string                  `yaml:"task_instructions"`
	Engine           EngineConfig            `yaml:"engine"`
	Provider         provider.ProviderConfig `yaml:"provider"`
	Security         SecurityConfig          `yaml:"security"`
	DingTalk         DingTalkConfig          `yaml:"dingtalk"`
	WhatsApp         WhatsAppConfig          `yaml:"whatsapp"`
	Options          map[string]any          `yaml:"options,omitempty"`
}

type ProcessorChain ΒΆ added in v0.12.0

type ProcessorChain struct {
	// contains filtered or unexported fields
}

ProcessorChain executes a chain of message processors

func NewDefaultProcessorChain ΒΆ added in v0.12.0

func NewDefaultProcessorChain(ctx context.Context, logger *slog.Logger) *ProcessorChain

NewDefaultProcessorChain creates a default processor chain with all standard processors

func NewProcessorChain ΒΆ added in v0.12.0

func NewProcessorChain(processors ...MessageProcessor) *ProcessorChain

NewProcessorChain creates a new processor chain with the given processors Processors are automatically sorted by Order()

func (*ProcessorChain) AddProcessor ΒΆ added in v0.12.0

func (c *ProcessorChain) AddProcessor(processor MessageProcessor)

AddProcessor adds a processor to the chain and re-sorts Note: This method is thread-safe but should preferably be called during initialization

func (*ProcessorChain) Close ΒΆ added in v0.17.0

func (c *ProcessorChain) Close()

Close stops all processors that have background goroutines

func (*ProcessorChain) Name ΒΆ added in v0.16.0

func (c *ProcessorChain) Name() string

Name returns the processor chain name

func (*ProcessorChain) Order ΒΆ added in v0.16.0

func (c *ProcessorChain) Order() int

Order returns the processor chain order

func (*ProcessorChain) Process ΒΆ added in v0.12.0

func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process executes all processors in order

func (*ProcessorChain) ResetSession ΒΆ added in v0.15.1

func (c *ProcessorChain) ResetSession(platform, sessionID string)

ResetSession propagate session reset to all processors that support it

func (*ProcessorChain) SetAggregatorSender ΒΆ added in v0.12.0

func (c *ProcessorChain) SetAggregatorSender(sender AggregatedMessageSender)

SetAggregatorSender sets the sender for MessageAggregatorProcessor if present

type ProcessorOrder ΒΆ added in v0.12.0

type ProcessorOrder int

ProcessorOrder defines standard processor ordering

const (
	// OrderRateLimit should run first to prevent abuse
	OrderRateLimit ProcessorOrder = 1
	// OrderZoneOrder ensures messages respect zone ordering (thinking→action→output→summary)
	OrderZoneOrder ProcessorOrder = 5
	// OrderFilter drops noise events before anything else
	OrderFilter ProcessorOrder = 10
	// OrderThread manages thread_ts caching for message chunking
	OrderThread ProcessorOrder = 15
	// OrderAggregation groups messages together
	OrderAggregation ProcessorOrder = 20
	// OrderFormatConversion converts markdown to platform-specific format
	OrderFormatConversion ProcessorOrder = 40
	// OrderChunk splits long messages for Slack API limits
	OrderChunk ProcessorOrder = 50
)

type QueueError ΒΆ

type QueueError struct {
	Message string
}

func (*QueueError) Error ΒΆ

func (e *QueueError) Error() string

type QueuedMessage ΒΆ

type QueuedMessage struct {
	Platform  string
	SessionID string
	Message   *ChatMessage
	Retries   int
	CreatedAt time.Time
}

type RateLimitProcessor ΒΆ added in v0.12.0

type RateLimitProcessor struct {
	// contains filtered or unexported fields
}

RateLimitProcessor implements non-blocking rate limiting for message sending. Instead of blocking the caller with time.After(), it drops messages that arrive too fast and lets the next one through after the minimum interval. This prevents stalling the engine callback goroutine during rapid event bursts.

func NewRateLimitProcessor ΒΆ added in v0.12.0

func NewRateLimitProcessor(logger *slog.Logger, opts RateLimitProcessorOptions) *RateLimitProcessor

NewRateLimitProcessor creates a new RateLimitProcessor

func (*RateLimitProcessor) Cleanup ΒΆ added in v0.12.0

func (p *RateLimitProcessor) Cleanup()

Cleanup removes old session rate limits

func (*RateLimitProcessor) GetSessionStats ΒΆ added in v0.12.0

func (p *RateLimitProcessor) GetSessionStats(platform, sessionID string) (lastSend time.Time, exists bool)

GetSessionStats returns rate limit stats for a session

func (*RateLimitProcessor) Name ΒΆ added in v0.12.0

func (p *RateLimitProcessor) Name() string

Name returns the processor name

func (*RateLimitProcessor) Order ΒΆ added in v0.12.0

func (p *RateLimitProcessor) Order() int

Order returns the processor order (should run first)

func (*RateLimitProcessor) Process ΒΆ added in v0.12.0

Process applies non-blocking rate limiting to the message. If a message arrives before the minimum interval has elapsed, it is dropped (returns nil) rather than blocking the caller. Messages with Immediate flag in aggregator config are always passed through.

type RateLimitProcessorOptions ΒΆ added in v0.12.0

type RateLimitProcessorOptions struct {
	MinInterval time.Duration // Minimum interval between messages
	MaxBurst    int           // Maximum messages in burst window
	BurstWindow time.Duration // Time window for burst calculation
}

RateLimitProcessorOptions configures the rate limit processor

type RateLimiter ΒΆ

type RateLimiter struct {
	// contains filtered or unexported fields
}

func NewRateLimiter ΒΆ

func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter

func (*RateLimiter) Allow ΒΆ

func (r *RateLimiter) Allow() bool

func (*RateLimiter) Wait ΒΆ

func (r *RateLimiter) Wait(ctx context.Context) error

type RetryConfig ΒΆ

type RetryConfig struct {
	MaxAttempts int
	BaseDelay   time.Duration
	MaxDelay    time.Duration
}

type RichContent ΒΆ

type RichContent = base.RichContent

type SecurityConfig ΒΆ added in v0.17.0

type SecurityConfig struct {
	VerifySignature bool             `yaml:"verify_signature"`
	Permission      PermissionConfig `yaml:"permission"`
}

type Session ΒΆ added in v0.15.5

type Session interface {
	ID() string
	Status() string
	CreatedAt() time.Time
	IsResumed() bool
}

Session abstracts session state and operations

type SessionOperations ΒΆ added in v0.15.5

type SessionOperations = base.SessionOperations

Re-export interfaces from base for convenience

type SessionStats ΒΆ added in v0.15.5

type SessionStats struct {
	SessionID     string
	Status        string
	TotalTokens   int64
	InputTokens   int64
	OutputTokens  int64
	CacheRead     int64
	CacheWrite    int64
	TotalCost     float64
	Duration      time.Duration
	ToolCallCount int
	ErrorCount    int
}

SessionStats holds session statistics

type SlackBlock ΒΆ

type SlackBlock map[string]any

type StreamAdapter ΒΆ

type StreamAdapter interface {
	ChatAdapter
	SendStreamMessage(ctx context.Context, sessionID string, msg *ChatMessage) (StreamHandler, error)
	UpdateMessage(ctx context.Context, sessionID, messageID string, msg *ChatMessage) error
}

type StreamCallback ΒΆ

type StreamCallback struct {
	// contains filtered or unexported fields
}

StreamCallback implements event.Callback to receive Engine events and forward to ChatApp

func NewStreamCallback ΒΆ

func NewStreamCallback(
	ctx context.Context,
	sessionID, platform string,
	adapters *AdapterManager,
	logger *slog.Logger,
	isHot bool,
	metadata map[string]any,
	messageOps MessageOperations,
	sessionOps SessionOperations,
) *StreamCallback

NewStreamCallback creates a new StreamCallback with injected platform-specific operations

func (*StreamCallback) Close ΒΆ added in v0.17.0

func (c *StreamCallback) Close()

Close releases resources held by the callback

func (*StreamCallback) Handle ΒΆ

func (c *StreamCallback) Handle(eventType string, data any) error

Handle implements event.Callback

func (*StreamCallback) SendAggregatedMessage ΒΆ added in v0.12.0

func (c *StreamCallback) SendAggregatedMessage(ctx context.Context, msg *ChatMessage) error

SendAggregatedMessage implements AggregatedMessageSender interface This is called by the MessageAggregatorProcessor when timer flushes buffered messages

type StreamHandler ΒΆ

type StreamHandler func(ctx context.Context, sessionID string, chunk string, isFinal bool) error

type ThreadInfo ΒΆ added in v0.12.0

type ThreadInfo struct {
	ThreadTS     string
	ChannelID    string
	LastActivity time.Time
}

ThreadInfo holds thread-related metadata for a session.

type ThreadProcessor ΒΆ added in v0.12.0

type ThreadProcessor struct {
	// contains filtered or unexported fields
}

ThreadProcessor manages thread_ts caching for message chunking. It tracks the first message's timestamp for each session to associate subsequent chunked messages in the same thread.

func NewThreadProcessor ΒΆ added in v0.12.0

func NewThreadProcessor(logger *slog.Logger, opts ThreadProcessorOptions) *ThreadProcessor

NewThreadProcessor creates a new ThreadProcessor.

func (*ThreadProcessor) Delete ΒΆ added in v0.12.0

func (p *ThreadProcessor) Delete(sessionID string)

Delete removes thread info for a session.

func (*ThreadProcessor) GetThreadTS ΒΆ added in v0.12.0

func (p *ThreadProcessor) GetThreadTS(sessionID string) string

GetThreadTS returns the stored thread_ts for a session. Returns empty string if no thread info exists.

func (*ThreadProcessor) Name ΒΆ added in v0.12.0

func (p *ThreadProcessor) Name() string

Name returns the processor name.

func (*ThreadProcessor) Order ΒΆ added in v0.12.0

func (p *ThreadProcessor) Order() int

Order returns the processor order (runs after rate limit, before aggregation).

func (*ThreadProcessor) Process ΒΆ added in v0.12.0

Process manages thread_ts for the message. For the first message: stores thread_ts from metadata if present. For subsequent messages: attaches stored thread_ts to metadata.

func (*ThreadProcessor) SetThreadTS ΒΆ added in v0.12.0

func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)

SetThreadTS explicitly sets the thread_ts for a session. This is useful when the first message response provides the thread_ts.

func (*ThreadProcessor) Stop ΒΆ added in v0.12.0

func (p *ThreadProcessor) Stop()

Stop stops the cleanup goroutine.

type ThreadProcessorOptions ΒΆ added in v0.12.0

type ThreadProcessorOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 5 min)
	TTL             time.Duration // How long to keep thread info (default: 30 min)
}

ThreadProcessorOptions configures the ThreadProcessor.

type WhatsAppConfig ΒΆ

type WhatsAppConfig struct {
	PhoneNumberID string `yaml:"phone_number_id"`
	AccessToken   string `yaml:"access_token"`
	VerifyToken   string `yaml:"verify_token"`
	APIVersion    string `yaml:"api_version"`
}

type ZoneConfig ΒΆ added in v0.15.1

type ZoneConfig struct {
	MaxMsgs  int // Maximum messages kept in the zone (0 = no limit)
	MaxRunes int // Maximum total content runes (0 = no limit)
}

ZoneConfig defines per-zone aggregation limits.

type ZoneOrderProcessor ΒΆ added in v0.15.1

type ZoneOrderProcessor struct {
	// contains filtered or unexported fields
}

ZoneOrderProcessor ensures messages respect zone ordering within a session. Earlier zones (lower index) are always sent before later zones. If an event arrives for a zone that should have already passed, it is still allowed through (late arrival is better than lost messages).

func NewZoneOrderProcessor ΒΆ added in v0.15.1

func NewZoneOrderProcessor(logger *slog.Logger) *ZoneOrderProcessor

NewZoneOrderProcessor creates a new ZoneOrderProcessor.

func (*ZoneOrderProcessor) Name ΒΆ added in v0.15.1

func (p *ZoneOrderProcessor) Name() string

Name returns the processor name.

func (*ZoneOrderProcessor) Order ΒΆ added in v0.15.1

func (p *ZoneOrderProcessor) Order() int

Order returns the processor order.

func (*ZoneOrderProcessor) Process ΒΆ added in v0.15.1

Process validates zone ordering. It annotates messages with their zone index in metadata for downstream processors (e.g., aggregator) to use.

func (*ZoneOrderProcessor) ResetSession ΒΆ added in v0.15.1

func (p *ZoneOrderProcessor) ResetSession(platform, sessionID string)

ResetSession clears zone state for a session (call on session end).

Directories ΒΆ

Path Synopsis
Package slack provides a high-performance, AI-native Slack adapter for the HotPlex engine.
Package slack provides a high-performance, AI-native Slack adapter for the HotPlex engine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL