notify

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 9 Imported by: 0

README

pkg/notify

Notification dispatch and subscription management for bc.

Overview

This package receives inbound events from pkg/gateway adapters and routes them to subscribed agents via tmux send-keys. Gateway adapters forward raw platform payloads as Notification{Raw JSON} -- this package handles dispatch, filtering, and delivery logging. Agents respond to platforms through the gateway API proxy, not through this package.

Key Types

  • Service -- dispatch core: receives notifications, queries subscribers, delivers via tmux, publishes to SSE hub
  • Store -- SQLite/Postgres persistence for subscriptions, delivery log, messages, and gateway state
  • Notification -- JSON payload sent to agents (includes raw platform payload)
  • Subscription -- ties an agent to a channel with mention_only flag

Dispatch Pipeline

  1. Adapter calls Service.Dispatch() with a Notification containing the raw platform JSON
  2. Service saves the message to notify_messages for the activity feed
  3. Service queries notify_subscriptions for matching agents
  4. For each subscriber: self-skip filter (sender == agent?), then mention filter
  5. Deliver via tmux send-keys with the full JSON payload
  6. Log delivery result to notify_delivery_log
  7. Publish gateway.message event to SSE hub for web UI

Database Tables

Table Purpose
notify_subscriptions Agent-to-channel mappings
notify_delivery_log Delivery attempt records
notify_messages Inbound messages for activity feed
notify_gateways Gateway connection state
notify_channels Channel discovery persistence

Architecture

See docs/architecture/notifications.md for the full notification architecture, including the 3-part adapter pattern, filtering logic, and database schema.

Documentation

Overview

Package notify implements the notification gateway for delivering external platform events (Slack, Telegram, Discord, etc.) to subscribed bc agents via tmux send-keys.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentSender

type AgentSender interface {
	Send(ctx context.Context, name, message string) error
	// SendAll broadcasts a message to all running agents.
	SendAll(ctx context.Context, message string) (sent int, err error)
}

AgentSender is the interface for sending a message to an agent's tmux session. Implemented by *agent.AgentService (Send method).

type Attachment

type Attachment struct {
	Filename  string `json:"filename"`
	MimeType  string `json:"mime_type"`
	URL       string `json:"url,omitempty"`
	LocalPath string `json:"local_path,omitempty"`
	Size      int64  `json:"size"`
}

Attachment describes a file shared on a channel.

type Broadcaster

type Broadcaster interface {
	Publish(eventType string, data map[string]any)
}

Broadcaster pushes events to connected web clients via SSE/WebSocket. Implemented by *ws.Hub.

type ChannelKey

type ChannelKey = string

ChannelKey is the canonical identifier for an external channel. Format: "<platform>:<channel_name>", e.g., "slack:engineering".

type DeliveryEntry

type DeliveryEntry struct {
	LoggedAt time.Time      `json:"logged_at"`
	Channel  string         `json:"channel"`
	Agent    string         `json:"agent"`
	Status   DeliveryStatus `json:"status"`
	Error    string         `json:"error,omitempty"`
	Preview  string         `json:"preview"`
	ID       int64          `json:"id"`
}

DeliveryEntry records one delivery attempt in the activity log.

type DeliveryStatus

type DeliveryStatus string

DeliveryStatus is the outcome of a tmux send-keys attempt.

const (
	StatusDelivered DeliveryStatus = "delivered"
	StatusFailed    DeliveryStatus = "failed"
	StatusPending   DeliveryStatus = "pending"
)

type MessageRecord

type MessageRecord struct {
	CreatedAt time.Time `json:"created_at"`
	Channel   string    `json:"channel"`
	Sender    string    `json:"sender"`
	Content   string    `json:"content"`
	ID        int64     `json:"id"`
}

MessageRecord is a stored inbound gateway message for the activity feed.

type Notification

type Notification struct {
	Raw         json.RawMessage `json:"raw,omitempty"`
	Timestamp   string          `json:"timestamp"`
	Channel     string          `json:"channel"`
	Platform    string          `json:"platform"`
	Sender      string          `json:"sender"`
	Content     string          `json:"content"`
	MessageID   string          `json:"message_id,omitempty"`
	Mentions    []string        `json:"mentions,omitempty"`
	Attachments []Attachment    `json:"attachments,omitempty"`
}

Notification is the JSON payload sent to subscribed agents via tmux send-keys.

type PersistedChannel

type PersistedChannel struct {
	BCChannel  string
	Platform   string
	PlatformID string
}

PersistedChannel is a saved bc_channel → platform_id mapping.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the notification dispatch core. It receives inbound messages from gateway adapters and routes them to subscribed agents via tmux send-keys.

func NewService

func NewService(store *Store, agents AgentSender, hub Broadcaster) *Service

NewService creates a new notify service.

func (*Service) AllSubscriptions

func (s *Service) AllSubscriptions(ctx context.Context) ([]Subscription, error)

AllSubscriptions returns all subscriptions across all channels.

func (*Service) ChannelActivity

func (s *Service) ChannelActivity(ctx context.Context, channel string, limit int) ([]DeliveryEntry, error)

ChannelActivity returns recent delivery log entries for a channel.

func (*Service) ChannelMessages

func (s *Service) ChannelMessages(ctx context.Context, channel string, limit int, before int64) ([]MessageRecord, error)

ChannelMessages returns recent messages for a channel (newest first).

func (*Service) ChannelSubscriptions

func (s *Service) ChannelSubscriptions(ctx context.Context, channel string) ([]Subscription, error)

ChannelSubscriptions returns all subscriptions for a channel.

func (*Service) Dispatch

func (s *Service) Dispatch(channel, platform, sender, senderID, content, messageID string, attachments []Attachment, raw json.RawMessage)

Dispatch receives a normalized inbound message and delivers it to subscribed agents only. Runs in its own goroutine — never blocks the adapter.

func (*Service) PruneOldActivity

func (s *Service) PruneOldActivity(ctx context.Context, keepPerChannel int) error

PruneOldActivity removes old delivery log entries for every channel, keeping the most recent keepPerChannel entries in each.

func (*Service) SetMentionOnly

func (s *Service) SetMentionOnly(ctx context.Context, channel, agent string, mentionOnly bool) error

SetMentionOnly updates the @mention-only toggle for a subscription.

func (*Service) Store

func (s *Service) Store() *Store

Store returns the underlying store for direct access by handlers.

func (*Service) Subscribe

func (s *Service) Subscribe(ctx context.Context, channel, agent string, mentionOnly bool) error

Subscribe adds an agent to a channel.

func (*Service) Unsubscribe

func (s *Service) Unsubscribe(ctx context.Context, channel, agent string) error

Unsubscribe removes an agent from a channel.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the SQLite/Postgres-backed persistence layer for subscriptions and the delivery log. Uses the shared workspace database.

func OpenStore

func OpenStore(workspacePath string) (*Store, error)

OpenStore opens the notify store using the shared workspace database.

func (*Store) AllSubscriptions

func (s *Store) AllSubscriptions(ctx context.Context) ([]Subscription, error)

AllSubscriptions returns all subscriptions across all channels.

func (*Store) Close

func (s *Store) Close() error

Close is a no-op — the shared DB is owned by the caller.

func (*Store) DeliveryChannels

func (s *Store) DeliveryChannels(ctx context.Context) ([]string, error)

DeliveryChannels returns the distinct channel names in the delivery log.

func (*Store) GetMessages

func (s *Store) GetMessages(ctx context.Context, channel string, limit int, before int64) ([]MessageRecord, error)

GetMessages returns recent messages for a channel (newest first).

func (*Store) LoadChannels

func (s *Store) LoadChannels(ctx context.Context) ([]PersistedChannel, error)

LoadChannels returns all persisted channel mappings.

func (*Store) LogDelivery

func (s *Store) LogDelivery(ctx context.Context, e DeliveryEntry) error

LogDelivery records a delivery attempt.

func (*Store) PruneActivity

func (s *Store) PruneActivity(ctx context.Context, channel string, keepLast int) error

PruneActivity deletes old delivery log entries, keeping the most recent keepLast per channel.

func (*Store) RecentActivity

func (s *Store) RecentActivity(ctx context.Context, channel string, limit int) ([]DeliveryEntry, error)

RecentActivity returns the most recent delivery log entries for a channel.

func (*Store) SaveChannel

func (s *Store) SaveChannel(ctx context.Context, bcChannel, platform, platformID string) error

SaveChannel persists a channel mapping so it survives server restarts.

func (*Store) SaveMessage

func (s *Store) SaveMessage(ctx context.Context, channel, sender, content string) error

SaveMessage stores an inbound gateway message for the activity feed.

func (*Store) SetMentionOnly

func (s *Store) SetMentionOnly(ctx context.Context, channel, agent string, mentionOnly bool) error

SetMentionOnly updates the mention_only flag for a subscription.

func (*Store) Subscribe

func (s *Store) Subscribe(ctx context.Context, channel, agent string, mentionOnly bool) error

Subscribe adds an agent to a channel. If already subscribed, this is a no-op.

func (*Store) Subscribers

func (s *Store) Subscribers(ctx context.Context, channel string) ([]Subscription, error)

Subscribers returns all subscriptions for a channel.

func (*Store) TotalMessageCount

func (s *Store) TotalMessageCount(ctx context.Context) (int, error)

TotalMessageCount returns the total number of stored messages across all channels.

func (*Store) Unsubscribe

func (s *Store) Unsubscribe(ctx context.Context, channel, agent string) error

Unsubscribe removes an agent from a channel.

type Subscription

type Subscription struct {
	CreatedAt   time.Time `json:"created_at"`
	Channel     string    `json:"channel"`
	Agent       string    `json:"agent"`
	ID          int64     `json:"id"`
	MentionOnly bool      `json:"mention_only"`
}

Subscription ties an agent to a channel with delivery settings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL