Documentation
¶
Overview ¶
Package channels provides bidirectional messaging connectors for platforms like WhatsApp, Telegram, Discord, and generic webhooks.
While connectivity handles synchronous request-response routing (bytes in, bytes out), channels handles long-lived, event-driven connections where messages arrive unprompted (inbound) and responses are pushed back (outbound).
The two packages are complementary: channels uses connectivity.Router for outbound processing (LLM calls, tool invocations) while managing the platform-specific connection lifecycle (authentication, reconnection, keepalive) that doesn't fit the request-response model.
d := channels.NewDispatcher(router, handler, channels.WithLogger(logger))
d.RegisterPlatform("whatsapp", channels.WhatsAppFactory())
d.RegisterPlatform("telegram", channels.TelegramFactory())
go d.Watch(ctx, db, 200*time.Millisecond)
The channels table in SQLite decides which connectors are active. Change it at runtime and the Dispatcher picks up the new config — zero downtime.
Index ¶
- Constants
- func Init(db *sql.DB) error
- func OpenDB(path string) (*sql.DB, error)
- type Admin
- func (a *Admin) DeleteChannel(ctx context.Context, name string) error
- func (a *Admin) GetChannel(ctx context.Context, name string) (*ChannelRow, error)
- func (a *Admin) ListChannels(ctx context.Context) ([]ChannelRow, error)
- func (a *Admin) SetEnabled(ctx context.Context, name string, enabled bool) error
- func (a *Admin) UpdateAuthState(ctx context.Context, name string, authState json.RawMessage) error
- func (a *Admin) UpsertChannel(ctx context.Context, name, platform string, enabled bool, ...) error
- type Attachment
- type Channel
- type ChannelFactory
- type ChannelInfo
- type ChannelRow
- type ChannelStatus
- type Direction
- type DiscordConfig
- type Dispatcher
- func (d *Dispatcher) Close() error
- func (d *Dispatcher) Inspect(name string) (info ChannelInfo, ok bool)
- func (d *Dispatcher) ListChannels() iter.Seq[ChannelInfo]
- func (d *Dispatcher) RegisterPlatform(platform string, f ChannelFactory)
- func (d *Dispatcher) Reload(ctx context.Context, db *sql.DB) error
- func (d *Dispatcher) Send(ctx context.Context, msg Message) error
- func (d *Dispatcher) Status(name string) (ChannelStatus, bool)
- func (d *Dispatcher) Watch(ctx context.Context, db *sql.DB, interval time.Duration)
- type DispatcherOption
- type ErrChannelDisabled
- type ErrChannelNotFound
- type ErrNoPlatformFactory
- type ErrSendFailed
- type InboundHandler
- type Message
- type TelegramConfig
- type WebhookConfig
- type WhatsAppConfig
Constants ¶
const Schema = `` /* 642-byte string literal not displayed */
Schema defines the channels table that drives the bidirectional connector lifecycle. Each row maps a channel name to a platform and its configuration.
Platforms:
- "whatsapp": WhatsApp via whatsmeow (multi-device, QR pairing).
- "telegram": Telegram via bot API or MTProto client.
- "discord": Discord via gateway WebSocket + REST API.
- "signal": Signal via signald subprocess.
- "webhook": Generic inbound HTTP webhook endpoint.
- "matrix": Matrix via mautrix-go client.
The config column holds per-channel JSON (credentials path, device name, etc.). The auth_state column stores persistent authentication state (session data, device ID) — not raw credentials, which belong in config or env vars. The enabled column implements the noop pattern: set enabled=0 to shut down a channel without deleting its config.
Any UPDATE to this table automatically increments PRAGMA data_version, which the Dispatcher.Watch loop detects to trigger a hot-reload.
Variables ¶
This section is empty.
Functions ¶
func OpenDB ¶
OpenDB opens a SQLite database at path with production-safe pragmas:
- journal_mode=WAL: concurrent reads during writes
- busy_timeout=5000: wait up to 5s for locks instead of immediate SQLITE_BUSY
- foreign_keys=ON: enforce FK constraints
The caller must blank-import the SQLite driver:
import _ "modernc.org/sqlite"
Use this instead of sql.Open for any database that will be shared between Admin writes, Dispatcher.Reload reads, and Watch polling.
Types ¶
type Admin ¶
type Admin struct {
// contains filtered or unexported fields
}
Admin provides CRUD operations on the channels table, suitable for exposure as MCP tools so an LLM can administer channels at runtime.
All mutations go through SQLite, so the Watch loop automatically picks up changes — no need to call Reload manually.
func NewAdmin ¶
NewAdmin creates an Admin backed by the given database. The database must have the channels schema applied (via Init).
func (*Admin) DeleteChannel ¶
DeleteChannel removes a channel from the channels table. The watcher will detect the change and close the associated connection.
func (*Admin) GetChannel ¶
GetChannel returns a single channel by name.
func (*Admin) ListChannels ¶
func (a *Admin) ListChannels(ctx context.Context) ([]ChannelRow, error)
ListChannels returns all channels from the SQLite table.
func (*Admin) SetEnabled ¶
SetEnabled enables or disables a channel without deleting its config. Set enabled=false to shut down the connection; set enabled=true to restart. Analogous to connectivity's noop strategy toggle.
func (*Admin) UpdateAuthState ¶
UpdateAuthState updates the persistent auth state for a channel. This is called by channel implementations when authentication state changes (e.g., WhatsApp pairing completes, token refresh).
func (*Admin) UpsertChannel ¶
func (a *Admin) UpsertChannel(ctx context.Context, name, platform string, enabled bool, config json.RawMessage) error
UpsertChannel inserts or updates a channel in the channels table. On conflict (same name), only platform, enabled, and config are updated; auth_state is preserved so that active sessions (e.g. WhatsApp pairing) are not lost when an admin changes the channel config. The watcher will detect the change and trigger a Reload automatically.
type Attachment ¶
type Attachment struct {
Type string `json:"type"` // "image", "audio", "video", "document"
URL string `json:"url"` // download URL or local path
MimeType string `json:"mime_type"` // e.g. "image/jpeg"
Caption string `json:"caption,omitempty"` // optional caption
Filename string `json:"filename,omitempty"` // original filename
}
Attachment is a media file attached to a message.
type Channel ¶
type Channel interface {
// Listen returns a read-only channel of inbound messages.
// The returned channel is closed when ctx is cancelled or Close is called.
Listen(ctx context.Context) <-chan Message
// Send pushes an outbound message to the platform.
Send(ctx context.Context, msg Message) error
// Status returns the current connection status.
Status() ChannelStatus
// Close shuts down the connection and releases resources.
// After Close, the channel returned by Listen will be closed.
Close() error
}
Channel is a bidirectional connection to a messaging platform. Listen returns a channel that emits inbound messages; the channel is closed when the context is cancelled or the connection is lost. Send pushes an outbound message to the platform.
type ChannelFactory ¶
type ChannelFactory func(name string, config json.RawMessage) (Channel, error)
ChannelFactory creates a Channel from a name and JSON config. Analogous to connectivity.TransportFactory but for bidirectional connections. The name is the channel's identifier in the channels table (e.g. "wa_principal"). The config is the per-channel JSON from the config column.
func DiscordFactory ¶
func DiscordFactory() ChannelFactory
DiscordFactory returns a ChannelFactory for Discord connections using discordgo (bwmarrin). Discordgo handles the gateway WebSocket, REST API, slash commands, and voice connections.
Config example:
{"bot_token": "Bot MTk...", "guild_id": "123456789"}
func TelegramFactory ¶
func TelegramFactory() ChannelFactory
TelegramFactory returns a ChannelFactory for Telegram connections.
By default, uses the bot API with long-polling (go-telegram-bot-api or similar). Set use_mtproto=true in config for the full MTProto client (gotd/td), which supports user accounts and more message types.
Config example (bot API):
{"bot_token": "123456:ABC-DEF"}
Config example (MTProto):
{"bot_token": "123456:ABC-DEF", "use_mtproto": true}
func WebhookFactory ¶
func WebhookFactory() ChannelFactory
WebhookFactory returns a ChannelFactory for generic inbound HTTP webhooks. This allows any external system to push messages into the channels pipeline via HTTP POST.
Inbound messages are received as JSON POST bodies. If the config includes a secret, the handler verifies the X-Signature-256 HMAC header before accepting.
Outbound messages are POSTed as JSON to msg.Metadata["callback_url"] when present; otherwise they are silently dropped (the caller is responsible for including a callback URL if it expects responses).
Config example:
{"listen_addr": ":8080", "path": "/webhook/inbound", "secret": "hmac_key"}
func WhatsAppFactory ¶
func WhatsAppFactory() ChannelFactory
WhatsAppFactory returns a ChannelFactory for WhatsApp connections using whatsmeow (tulir). Whatsmeow handles multi-device pairing, QR code auth, automatic reconnection, and all message types.
The factory is a stub: it creates a whatsAppChannel that implements the Channel interface. The actual whatsmeow integration (WA client, event handlers, media download) is wired up when the dependency is available.
Config example:
{"device_name": "horos-ariege", "store_path": "/data/wa_session.db"}
type ChannelInfo ¶
type ChannelInfo struct {
Name string `json:"name"`
Platform string `json:"platform"`
Status ChannelStatus `json:"status"`
Connected bool `json:"connected"`
}
ChannelInfo describes an active channel as seen by the dispatcher at a point in time. The struct is a snapshot; the dispatcher may have reloaded since this was created.
type ChannelRow ¶
type ChannelRow struct {
Name string `json:"name"`
Platform string `json:"platform"`
Enabled bool `json:"enabled"`
Config json.RawMessage `json:"config,omitempty"`
AuthState json.RawMessage `json:"auth_state,omitempty"`
UpdatedAt int64 `json:"updated_at"`
}
ChannelRow represents a single row from the channels table.
type ChannelStatus ¶
type ChannelStatus struct {
Connected bool `json:"connected"`
Platform string `json:"platform"`
AuthState string `json:"auth_state"` // "paired", "pending_qr", "token_valid", etc.
LastMessage time.Time `json:"last_message"`
Error string `json:"error,omitempty"`
}
ChannelStatus describes the current state of a channel connection.
type Direction ¶
type Direction int
Direction indicates whether a message is inbound (received from a user) or outbound (sent by the system).
type DiscordConfig ¶
type DiscordConfig struct {
// BotToken is the Discord bot token (from Developer Portal).
BotToken string `json:"bot_token"`
// GuildID restricts the bot to a specific guild. If empty, the bot
// listens on all guilds it has been invited to.
GuildID string `json:"guild_id,omitempty"`
// ChannelIDs restricts listening to specific channel IDs.
// If empty, all channels the bot can see are monitored.
ChannelIDs []string `json:"channel_ids,omitempty"`
// Intents specifies the gateway intents. Defaults to MessageContent +
// GuildMessages + DirectMessages if empty.
Intents int `json:"intents,omitempty"`
}
DiscordConfig is the per-channel JSON config for Discord connections.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher manages active channels and routes inbound messages through a processing pipeline. It watches the SQLite channels table for changes and creates/closes channels accordingly.
The Dispatcher is the integration point between the channels package (bidirectional messaging) and the connectivity package (request-response routing). The InboundHandler typically calls connectivity.Router.Call to process messages through an LLM pipeline.
func NewDispatcher ¶
func NewDispatcher(handler InboundHandler, opts ...DispatcherOption) *Dispatcher
NewDispatcher creates a Dispatcher with the given inbound handler. Register platform factories before calling Watch.
func (*Dispatcher) Close ¶
func (d *Dispatcher) Close() error
Close shuts down all active channels and cancels the lifecycle context.
func (*Dispatcher) Inspect ¶
func (d *Dispatcher) Inspect(name string) (info ChannelInfo, ok bool)
Inspect returns detailed information about a single active channel. Returns ok=false if the channel is not active in the dispatcher.
func (*Dispatcher) ListChannels ¶
func (d *Dispatcher) ListChannels() iter.Seq[ChannelInfo]
ListChannels returns an iterator over all active channels in the dispatcher.
func (*Dispatcher) RegisterPlatform ¶
func (d *Dispatcher) RegisterPlatform(platform string, f ChannelFactory)
RegisterPlatform registers a ChannelFactory for a platform name. Must be called before Watch. Example: d.RegisterPlatform("whatsapp", WhatsAppFactory())
func (*Dispatcher) Reload ¶
Reload reads the channels table and reconciles the active channel set. New enabled channels are started, removed or disabled channels are closed, and channels with changed config are restarted.
Channel listen contexts are parented to the Dispatcher's lifecycle context, not the ctx passed here. This ensures that channels survive beyond a short-lived request context (e.g. an admin HTTP handler with a timeout).
func (*Dispatcher) Send ¶
func (d *Dispatcher) Send(ctx context.Context, msg Message) error
Send sends an outbound message through the named channel. Returns ErrChannelNotFound if the channel doesn't exist or is not active.
func (*Dispatcher) Status ¶
func (d *Dispatcher) Status(name string) (ChannelStatus, bool)
Status returns the ChannelStatus for a named channel. Returns ok=false if the channel is not active.
func (*Dispatcher) Watch ¶
Watch polls PRAGMA data_version on the database at the given interval. When the version changes (meaning any write to the channels table or any other table in the same database occurred), it triggers a Reload.
data_version is auto-incremented by SQLite on any write — no triggers needed. This is the same proven pattern used by connectivity.Router.Watch and the mcprt tool registry.
Watch blocks until ctx is cancelled. Run it in a goroutine:
go dispatcher.Watch(ctx, db, 200*time.Millisecond)
type DispatcherOption ¶
type DispatcherOption func(*Dispatcher)
DispatcherOption configures a Dispatcher.
func WithLogger ¶
func WithLogger(l *slog.Logger) DispatcherOption
WithLogger sets a custom logger for the dispatcher.
func WithMaxConcurrent ¶
func WithMaxConcurrent(n int) DispatcherOption
WithMaxConcurrent sets the maximum number of concurrent InboundHandler calls across all channels. Use this to prevent unbounded goroutine growth when a high-throughput channel (e.g. WhatsApp group) produces messages faster than the handler (typically an LLM call) can process them. Zero or negative means unlimited (default).
type ErrChannelDisabled ¶
type ErrChannelDisabled struct {
Channel string
}
ErrChannelDisabled is returned when Send is called on a disabled channel.
func (*ErrChannelDisabled) Error ¶
func (e *ErrChannelDisabled) Error() string
type ErrChannelNotFound ¶
type ErrChannelNotFound struct {
Channel string
}
ErrChannelNotFound is returned when an operation targets a channel that does not exist in the channels table or the dispatcher's active set.
func (*ErrChannelNotFound) Error ¶
func (e *ErrChannelNotFound) Error() string
type ErrNoPlatformFactory ¶
ErrNoPlatformFactory is returned during reload when a channel's platform has no registered ChannelFactory.
func (*ErrNoPlatformFactory) Error ¶
func (e *ErrNoPlatformFactory) Error() string
type ErrSendFailed ¶
ErrSendFailed is returned when a message could not be delivered to the platform.
func (*ErrSendFailed) Error ¶
func (e *ErrSendFailed) Error() string
func (*ErrSendFailed) Unwrap ¶
func (e *ErrSendFailed) Unwrap() error
type InboundHandler ¶
InboundHandler processes an inbound message and returns zero or more outbound response messages. This is the integration point where anonymization, LLM processing, and de-anonymization happen.
The handler may return nil to indicate no response should be sent.
type Message ¶
type Message struct {
ID string `json:"id"`
ChannelName string `json:"channel"` // e.g. "whatsapp_main", "tg_support"
Platform string `json:"platform"` // "whatsapp", "telegram", "discord", "webhook"
Direction Direction `json:"direction"` // Inbound or Outbound
SenderID string `json:"sender_id"` // platform-specific user ID
RecipientID string `json:"recipient_id"` // platform-specific recipient ID
Text string `json:"text"` // message body
ReplyTo string `json:"reply_to,omitempty"` // ID of message being replied to
Attachments []Attachment `json:"attachments,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"` // platform-specific extras
Timestamp time.Time `json:"timestamp"`
}
Message is a platform-normalized inbound or outbound message. All platform-specific details are stripped; platform-specific metadata can be carried in the Metadata map.
type TelegramConfig ¶
type TelegramConfig struct {
// BotToken is the Telegram bot API token (from @BotFather).
// For security, prefer passing via environment variable and referencing
// the env var name here.
BotToken string `json:"bot_token"`
// UseMTProto enables the full MTProto client (gotd/td) instead of the
// simpler bot API. Required for user accounts, optional for bots.
UseMTProto bool `json:"use_mtproto,omitempty"`
// WebhookURL, if set, uses webhook mode instead of long-polling.
// Must be a publicly reachable HTTPS URL.
WebhookURL string `json:"webhook_url,omitempty"`
}
TelegramConfig is the per-channel JSON config for Telegram connections.
type WebhookConfig ¶
type WebhookConfig struct {
// ListenAddr is the address to bind the HTTP server (e.g. ":8080").
ListenAddr string `json:"listen_addr"`
// Path is the URL path to listen on (e.g. "/webhook/inbound").
Path string `json:"path"`
// Secret is an optional shared secret for HMAC-SHA256 signature verification.
// When set, inbound requests must include an X-Signature-256 header with
// the hex-encoded HMAC-SHA256 of the request body.
Secret string `json:"secret,omitempty"`
// MaxBodyBytes limits the request body size. Defaults to 1MB.
MaxBodyBytes int64 `json:"max_body_bytes,omitempty"`
}
WebhookConfig is the per-channel JSON config for generic inbound webhooks.
type WhatsAppConfig ¶
type WhatsAppConfig struct {
// DeviceName is the display name for the linked device.
DeviceName string `json:"device_name"`
// StorePath is the path to whatsmeow's SQLite session database.
StorePath string `json:"store_path"`
}
WhatsAppConfig is the per-channel JSON config for WhatsApp connections.