channels

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 16 Imported by: 0

README

channels — bidirectional messaging with hot-reload

channels manages multiple messaging platform connectors (webhook, Discord, Telegram, WhatsApp) with configuration stored in SQLite and hot-reloaded via PRAGMA data_version.

                           ┌─────────────┐
  Telegram  ◄────────────►│             │
  Discord   ◄────────────►│  Dispatcher  │◄──── InboundHandler
  Webhook   ◄────────────►│             │       (your logic)
  WhatsApp  ◄────────────►│             │
                           └──────┬──────┘
                                  │ watch
                           ┌──────▼──────┐
                           │   channels   │  SQLite table
                           │   (config)   │
                           └─────────────┘

Quick start

dispatcher := channels.NewDispatcher(handler,
    channels.WithMaxConcurrent(10),
)
dispatcher.RegisterPlatform("webhook", channels.WebhookFactory())

channels.Init(db)
go channels.Watch(ctx, db, 2*time.Second) // hot-reload

dispatcher.Send(ctx, channels.Message{ChannelName: "alerts", Text: "hello"})

Schema

CREATE TABLE channels (
    name       TEXT PRIMARY KEY,
    platform   TEXT NOT NULL,  -- webhook, telegram, discord, whatsapp, signal, matrix
    enabled    INTEGER NOT NULL DEFAULT 1,
    config     TEXT,           -- JSON, platform-specific
    auth_state TEXT,           -- JSON, managed by platform SDK
    updated_at INTEGER
);

Hot-reload behavior

The watcher polls PRAGMA data_version. On change the dispatcher:

  • Starts new channels.
  • Stops removed or disabled channels.
  • Restarts channels whose config fingerprint changed (auth_state changes are ignored since they are managed by platform SDKs).

Webhook channel

  • Inbound: starts an HTTP server, validates optional HMAC-SHA256 signature.
  • Outbound: POSTs JSON to msg.Metadata["callback_url"] with SSRF guard.

Exported API

Symbol Description
Channel Interface: Listen, Send, Status, Close
Dispatcher Manages active channels with hot-reload
Message Platform-normalized message
ChannelFactory func(name, config) (Channel, error)
InboundHandler func(ctx, Message) ([]Message, error)
WebhookFactory() Factory for HTTP webhook channels
WithMaxConcurrent(n) Limit concurrent inbound handler calls

Documentation

Overview

Package channels provides bidirectional messaging connectors for platforms like WhatsApp, Telegram, Discord, and generic webhooks.

While connectivity handles synchronous request-response routing (bytes in, bytes out), channels handles long-lived, event-driven connections where messages arrive unprompted (inbound) and responses are pushed back (outbound).

The two packages are complementary: channels uses connectivity.Router for outbound processing (LLM calls, tool invocations) while managing the platform-specific connection lifecycle (authentication, reconnection, keepalive) that doesn't fit the request-response model.

d := channels.NewDispatcher(router, handler, channels.WithLogger(logger))
d.RegisterPlatform("whatsapp", channels.WhatsAppFactory())
d.RegisterPlatform("telegram", channels.TelegramFactory())
go d.Watch(ctx, db, 200*time.Millisecond)

The channels table in SQLite decides which connectors are active. Change it at runtime and the Dispatcher picks up the new config — zero downtime.

Index

Constants

View Source
const Schema = `` /* 642-byte string literal not displayed */

Schema defines the channels table that drives the bidirectional connector lifecycle. Each row maps a channel name to a platform and its configuration.

Platforms:

  • "whatsapp": WhatsApp via whatsmeow (multi-device, QR pairing).
  • "telegram": Telegram via bot API or MTProto client.
  • "discord": Discord via gateway WebSocket + REST API.
  • "signal": Signal via signald subprocess.
  • "webhook": Generic inbound HTTP webhook endpoint.
  • "matrix": Matrix via mautrix-go client.

The config column holds per-channel JSON (credentials path, device name, etc.). The auth_state column stores persistent authentication state (session data, device ID) — not raw credentials, which belong in config or env vars. The enabled column implements the noop pattern: set enabled=0 to shut down a channel without deleting its config.

Any UPDATE to this table automatically increments PRAGMA data_version, which the Dispatcher.Watch loop detects to trigger a hot-reload.

Variables

This section is empty.

Functions

func Init

func Init(db *sql.DB) error

Init creates the channels table if it doesn't exist.

func OpenDB

func OpenDB(path string) (*sql.DB, error)

OpenDB opens a SQLite database at path with production-safe pragmas:

  • journal_mode=WAL: concurrent reads during writes
  • busy_timeout=5000: wait up to 5s for locks instead of immediate SQLITE_BUSY
  • foreign_keys=ON: enforce FK constraints

The caller must blank-import the SQLite driver:

import _ "modernc.org/sqlite"

Use this instead of sql.Open for any database that will be shared between Admin writes, Dispatcher.Reload reads, and Watch polling.

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

Admin provides CRUD operations on the channels table, suitable for exposure as MCP tools so an LLM can administer channels at runtime.

All mutations go through SQLite, so the Watch loop automatically picks up changes — no need to call Reload manually.

func NewAdmin

func NewAdmin(db *sql.DB) *Admin

NewAdmin creates an Admin backed by the given database. The database must have the channels schema applied (via Init).

func (*Admin) DeleteChannel

func (a *Admin) DeleteChannel(ctx context.Context, name string) error

DeleteChannel removes a channel from the channels table. The watcher will detect the change and close the associated connection.

func (*Admin) GetChannel

func (a *Admin) GetChannel(ctx context.Context, name string) (*ChannelRow, error)

GetChannel returns a single channel by name.

func (*Admin) ListChannels

func (a *Admin) ListChannels(ctx context.Context) ([]ChannelRow, error)

ListChannels returns all channels from the SQLite table.

func (*Admin) SetEnabled

func (a *Admin) SetEnabled(ctx context.Context, name string, enabled bool) error

SetEnabled enables or disables a channel without deleting its config. Set enabled=false to shut down the connection; set enabled=true to restart. Analogous to connectivity's noop strategy toggle.

func (*Admin) UpdateAuthState

func (a *Admin) UpdateAuthState(ctx context.Context, name string, authState json.RawMessage) error

UpdateAuthState updates the persistent auth state for a channel. This is called by channel implementations when authentication state changes (e.g., WhatsApp pairing completes, token refresh).

func (*Admin) UpsertChannel

func (a *Admin) UpsertChannel(ctx context.Context, name, platform string, enabled bool, config json.RawMessage) error

UpsertChannel inserts or updates a channel in the channels table. On conflict (same name), only platform, enabled, and config are updated; auth_state is preserved so that active sessions (e.g. WhatsApp pairing) are not lost when an admin changes the channel config. The watcher will detect the change and trigger a Reload automatically.

type Attachment

type Attachment struct {
	Type     string `json:"type"`               // "image", "audio", "video", "document"
	URL      string `json:"url"`                // download URL or local path
	MimeType string `json:"mime_type"`          // e.g. "image/jpeg"
	Caption  string `json:"caption,omitempty"`  // optional caption
	Filename string `json:"filename,omitempty"` // original filename
}

Attachment is a media file attached to a message.

type Channel

type Channel interface {
	// Listen returns a read-only channel of inbound messages.
	// The returned channel is closed when ctx is cancelled or Close is called.
	Listen(ctx context.Context) <-chan Message

	// Send pushes an outbound message to the platform.
	Send(ctx context.Context, msg Message) error

	// Status returns the current connection status.
	Status() ChannelStatus

	// Close shuts down the connection and releases resources.
	// After Close, the channel returned by Listen will be closed.
	Close() error
}

Channel is a bidirectional connection to a messaging platform. Listen returns a channel that emits inbound messages; the channel is closed when the context is cancelled or the connection is lost. Send pushes an outbound message to the platform.

type ChannelFactory

type ChannelFactory func(name string, config json.RawMessage) (Channel, error)

ChannelFactory creates a Channel from a name and JSON config. Analogous to connectivity.TransportFactory but for bidirectional connections. The name is the channel's identifier in the channels table (e.g. "wa_principal"). The config is the per-channel JSON from the config column.

func DiscordFactory

func DiscordFactory() ChannelFactory

DiscordFactory returns a ChannelFactory for Discord connections using discordgo (bwmarrin). Discordgo handles the gateway WebSocket, REST API, slash commands, and voice connections.

Config example:

{"bot_token": "Bot MTk...", "guild_id": "123456789"}

func TelegramFactory

func TelegramFactory() ChannelFactory

TelegramFactory returns a ChannelFactory for Telegram connections.

By default, uses the bot API with long-polling (go-telegram-bot-api or similar). Set use_mtproto=true in config for the full MTProto client (gotd/td), which supports user accounts and more message types.

Config example (bot API):

{"bot_token": "123456:ABC-DEF"}

Config example (MTProto):

{"bot_token": "123456:ABC-DEF", "use_mtproto": true}

func WebhookFactory

func WebhookFactory() ChannelFactory

WebhookFactory returns a ChannelFactory for generic inbound HTTP webhooks. This allows any external system to push messages into the channels pipeline via HTTP POST.

Inbound messages are received as JSON POST bodies. If the config includes a secret, the handler verifies the X-Signature-256 HMAC header before accepting.

Outbound messages are POSTed as JSON to msg.Metadata["callback_url"] when present; otherwise they are silently dropped (the caller is responsible for including a callback URL if it expects responses).

Config example:

{"listen_addr": ":8080", "path": "/webhook/inbound", "secret": "hmac_key"}

func WhatsAppFactory

func WhatsAppFactory() ChannelFactory

WhatsAppFactory returns a ChannelFactory for WhatsApp connections using whatsmeow (tulir). Whatsmeow handles multi-device pairing, QR code auth, automatic reconnection, and all message types.

The factory is a stub: it creates a whatsAppChannel that implements the Channel interface. The actual whatsmeow integration (WA client, event handlers, media download) is wired up when the dependency is available.

Config example:

{"device_name": "horos-ariege", "store_path": "/data/wa_session.db"}

type ChannelInfo

type ChannelInfo struct {
	Name      string        `json:"name"`
	Platform  string        `json:"platform"`
	Status    ChannelStatus `json:"status"`
	Connected bool          `json:"connected"`
}

ChannelInfo describes an active channel as seen by the dispatcher at a point in time. The struct is a snapshot; the dispatcher may have reloaded since this was created.

type ChannelRow

type ChannelRow struct {
	Name      string          `json:"name"`
	Platform  string          `json:"platform"`
	Enabled   bool            `json:"enabled"`
	Config    json.RawMessage `json:"config,omitempty"`
	AuthState json.RawMessage `json:"auth_state,omitempty"`
	UpdatedAt int64           `json:"updated_at"`
}

ChannelRow represents a single row from the channels table.

type ChannelStatus

type ChannelStatus struct {
	Connected   bool      `json:"connected"`
	Platform    string    `json:"platform"`
	AuthState   string    `json:"auth_state"` // "paired", "pending_qr", "token_valid", etc.
	LastMessage time.Time `json:"last_message"`
	Error       string    `json:"error,omitempty"`
}

ChannelStatus describes the current state of a channel connection.

type Direction

type Direction int

Direction indicates whether a message is inbound (received from a user) or outbound (sent by the system).

const (
	Inbound  Direction = iota // Message received from a platform user.
	Outbound                  // Message sent to a platform user.
)

func (Direction) String

func (d Direction) String() string

String returns "inbound" or "outbound".

type DiscordConfig

type DiscordConfig struct {
	// BotToken is the Discord bot token (from Developer Portal).
	BotToken string `json:"bot_token"`
	// GuildID restricts the bot to a specific guild. If empty, the bot
	// listens on all guilds it has been invited to.
	GuildID string `json:"guild_id,omitempty"`
	// ChannelIDs restricts listening to specific channel IDs.
	// If empty, all channels the bot can see are monitored.
	ChannelIDs []string `json:"channel_ids,omitempty"`
	// Intents specifies the gateway intents. Defaults to MessageContent +
	// GuildMessages + DirectMessages if empty.
	Intents int `json:"intents,omitempty"`
}

DiscordConfig is the per-channel JSON config for Discord connections.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher manages active channels and routes inbound messages through a processing pipeline. It watches the SQLite channels table for changes and creates/closes channels accordingly.

The Dispatcher is the integration point between the channels package (bidirectional messaging) and the connectivity package (request-response routing). The InboundHandler typically calls connectivity.Router.Call to process messages through an LLM pipeline.

func NewDispatcher

func NewDispatcher(handler InboundHandler, opts ...DispatcherOption) *Dispatcher

NewDispatcher creates a Dispatcher with the given inbound handler. Register platform factories before calling Watch.

func (*Dispatcher) Close

func (d *Dispatcher) Close() error

Close shuts down all active channels and cancels the lifecycle context.

func (*Dispatcher) Inspect

func (d *Dispatcher) Inspect(name string) (info ChannelInfo, ok bool)

Inspect returns detailed information about a single active channel. Returns ok=false if the channel is not active in the dispatcher.

func (*Dispatcher) ListChannels

func (d *Dispatcher) ListChannels() iter.Seq[ChannelInfo]

ListChannels returns an iterator over all active channels in the dispatcher.

func (*Dispatcher) RegisterPlatform

func (d *Dispatcher) RegisterPlatform(platform string, f ChannelFactory)

RegisterPlatform registers a ChannelFactory for a platform name. Must be called before Watch. Example: d.RegisterPlatform("whatsapp", WhatsAppFactory())

func (*Dispatcher) Reload

func (d *Dispatcher) Reload(ctx context.Context, db *sql.DB) error

Reload reads the channels table and reconciles the active channel set. New enabled channels are started, removed or disabled channels are closed, and channels with changed config are restarted.

Channel listen contexts are parented to the Dispatcher's lifecycle context, not the ctx passed here. This ensures that channels survive beyond a short-lived request context (e.g. an admin HTTP handler with a timeout).

func (*Dispatcher) Send

func (d *Dispatcher) Send(ctx context.Context, msg Message) error

Send sends an outbound message through the named channel. Returns ErrChannelNotFound if the channel doesn't exist or is not active.

func (*Dispatcher) Status

func (d *Dispatcher) Status(name string) (ChannelStatus, bool)

Status returns the ChannelStatus for a named channel. Returns ok=false if the channel is not active.

func (*Dispatcher) Watch

func (d *Dispatcher) Watch(ctx context.Context, db *sql.DB, interval time.Duration)

Watch polls PRAGMA data_version on the database at the given interval. When the version changes (meaning any write to the channels table or any other table in the same database occurred), it triggers a Reload.

data_version is auto-incremented by SQLite on any write — no triggers needed. This is the same proven pattern used by connectivity.Router.Watch and the mcprt tool registry.

Watch blocks until ctx is cancelled. Run it in a goroutine:

go dispatcher.Watch(ctx, db, 200*time.Millisecond)

type DispatcherOption

type DispatcherOption func(*Dispatcher)

DispatcherOption configures a Dispatcher.

func WithLogger

func WithLogger(l *slog.Logger) DispatcherOption

WithLogger sets a custom logger for the dispatcher.

func WithMaxConcurrent

func WithMaxConcurrent(n int) DispatcherOption

WithMaxConcurrent sets the maximum number of concurrent InboundHandler calls across all channels. Use this to prevent unbounded goroutine growth when a high-throughput channel (e.g. WhatsApp group) produces messages faster than the handler (typically an LLM call) can process them. Zero or negative means unlimited (default).

type ErrChannelDisabled

type ErrChannelDisabled struct {
	Channel string
}

ErrChannelDisabled is returned when Send is called on a disabled channel.

func (*ErrChannelDisabled) Error

func (e *ErrChannelDisabled) Error() string

type ErrChannelNotFound

type ErrChannelNotFound struct {
	Channel string
}

ErrChannelNotFound is returned when an operation targets a channel that does not exist in the channels table or the dispatcher's active set.

func (*ErrChannelNotFound) Error

func (e *ErrChannelNotFound) Error() string

type ErrNoPlatformFactory

type ErrNoPlatformFactory struct {
	Channel  string
	Platform string
}

ErrNoPlatformFactory is returned during reload when a channel's platform has no registered ChannelFactory.

func (*ErrNoPlatformFactory) Error

func (e *ErrNoPlatformFactory) Error() string

type ErrSendFailed

type ErrSendFailed struct {
	Channel  string
	Platform string
	Cause    error
}

ErrSendFailed is returned when a message could not be delivered to the platform.

func (*ErrSendFailed) Error

func (e *ErrSendFailed) Error() string

func (*ErrSendFailed) Unwrap

func (e *ErrSendFailed) Unwrap() error

type InboundHandler

type InboundHandler func(ctx context.Context, msg Message) ([]Message, error)

InboundHandler processes an inbound message and returns zero or more outbound response messages. This is the integration point where anonymization, LLM processing, and de-anonymization happen.

The handler may return nil to indicate no response should be sent.

type Message

type Message struct {
	ID          string            `json:"id"`
	ChannelName string            `json:"channel"`            // e.g. "whatsapp_main", "tg_support"
	Platform    string            `json:"platform"`           // "whatsapp", "telegram", "discord", "webhook"
	Direction   Direction         `json:"direction"`          // Inbound or Outbound
	SenderID    string            `json:"sender_id"`          // platform-specific user ID
	RecipientID string            `json:"recipient_id"`       // platform-specific recipient ID
	Text        string            `json:"text"`               // message body
	ReplyTo     string            `json:"reply_to,omitempty"` // ID of message being replied to
	Attachments []Attachment      `json:"attachments,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"` // platform-specific extras
	Timestamp   time.Time         `json:"timestamp"`
}

Message is a platform-normalized inbound or outbound message. All platform-specific details are stripped; platform-specific metadata can be carried in the Metadata map.

type TelegramConfig

type TelegramConfig struct {
	// BotToken is the Telegram bot API token (from @BotFather).
	// For security, prefer passing via environment variable and referencing
	// the env var name here.
	BotToken string `json:"bot_token"`
	// UseMTProto enables the full MTProto client (gotd/td) instead of the
	// simpler bot API. Required for user accounts, optional for bots.
	UseMTProto bool `json:"use_mtproto,omitempty"`
	// WebhookURL, if set, uses webhook mode instead of long-polling.
	// Must be a publicly reachable HTTPS URL.
	WebhookURL string `json:"webhook_url,omitempty"`
}

TelegramConfig is the per-channel JSON config for Telegram connections.

type WebhookConfig

type WebhookConfig struct {
	// ListenAddr is the address to bind the HTTP server (e.g. ":8080").
	ListenAddr string `json:"listen_addr"`
	// Path is the URL path to listen on (e.g. "/webhook/inbound").
	Path string `json:"path"`
	// Secret is an optional shared secret for HMAC-SHA256 signature verification.
	// When set, inbound requests must include an X-Signature-256 header with
	// the hex-encoded HMAC-SHA256 of the request body.
	Secret string `json:"secret,omitempty"`
	// MaxBodyBytes limits the request body size. Defaults to 1MB.
	MaxBodyBytes int64 `json:"max_body_bytes,omitempty"`
}

WebhookConfig is the per-channel JSON config for generic inbound webhooks.

type WhatsAppConfig

type WhatsAppConfig struct {
	// DeviceName is the display name for the linked device.
	DeviceName string `json:"device_name"`
	// StorePath is the path to whatsmeow's SQLite session database.
	StorePath string `json:"store_path"`
}

WhatsAppConfig is the per-channel JSON config for WhatsApp connections.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL