Documentation
¶
Overview ¶
Package postgres provides PostgreSQL storage and pub/sub plumbing for the platform's session layer.
Package postgres provides PostgreSQL storage for sessions.
Index ¶
- Constants
- type Broadcaster
- type Config
- type Store
- func (s *Store) Cleanup(ctx context.Context) error
- func (s *Store) Close() error
- func (s *Store) Create(ctx context.Context, sess *session.Session) error
- func (s *Store) Delete(ctx context.Context, id string) error
- func (s *Store) Get(ctx context.Context, id string) (*session.Session, error)
- func (s *Store) List(ctx context.Context) ([]*session.Session, error)
- func (s *Store) StartCleanupRoutine(interval time.Duration)
- func (s *Store) Touch(ctx context.Context, id string) error
- func (s *Store) UpdateState(ctx context.Context, id string, state map[string]any) error
Constants ¶
const DefaultNotifyChannel = "mcp_notifications"
DefaultNotifyChannel is the postgres LISTEN/NOTIFY channel name used by the platform for cross-replica MCP notification fan-out. Hard-coded rather than per-deployment so every replica that joins the same database immediately participates in the same event stream without extra config. The string is a postgres identifier (no quoting); keep it lowercase and ASCII.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶ added in v1.58.0
type Broadcaster struct {
// contains filtered or unexported fields
}
Broadcaster is a postgres-backed session.Broadcaster. Every active replica LISTENs on the same channel and re-publishes received notifications to its local in-memory subscribers, so a tool-list change on any replica fan-outs to all SSE long-poll clients across the cluster.
Wire model:
- Publish encodes the Event as JSON and issues `SELECT pg_notify($1, $2)`. No local short-circuit — every replica (including the publisher) hears the event via its own LISTEN connection, so behavior is symmetric and there is no missed-delivery edge case when a publisher goes down between local fan-out and the NOTIFY round-trip.
- The background goroutine reads pq.Listener.NotificationChannel() and forwards each Notification to the embedded local MemoryBroadcaster. Local SSE subscribers receive events exactly as if a same-process Publish had fired.
func NewBroadcaster ¶ added in v1.58.0
func NewBroadcaster(dsn string, db *sql.DB, channel string, logger *slog.Logger) (*Broadcaster, error)
NewBroadcaster builds a postgres-backed broadcaster bound to the given channel. dsn is the libpq connection string used by the dedicated LISTEN connection; db is the sql.DB used for outbound pg_notify calls (re-using the platform's connection pool keeps connection counts predictable).
On success the returned broadcaster has already issued LISTEN and the background goroutine is running. Returns an error if LISTEN fails — typically a missing privilege on the role or a syntactically invalid channel name. Does NOT block on initial connectivity to postgres beyond the LISTEN command itself.
func (*Broadcaster) Close ¶ added in v1.58.0
func (b *Broadcaster) Close() error
Close stops the background goroutine, closes the local broadcaster (releasing every subscriber), and tears down the listen connection. Idempotent. Returns the first non-nil error encountered.
func (*Broadcaster) Publish ¶ added in v1.58.0
Publish encodes the event and issues pg_notify. Returns ErrBroadcasterClosed after Close. Surfacing pg_notify errors to callers is intentional — they pick whether to retry; we do not.
Latency note: there is no local short-circuit. Even the publishing replica's local subscribers receive the event only after the pg_notify round-trip + LISTEN dispatch. Under normal postgres latency this is sub-millisecond; under DB pressure it can stretch into 100s of ms. The gateway's notifyDispatchTimeout (5s) bounds the worst case so a partitioned DB cannot leak goroutines, but callers expecting same-process delivery should use MemoryBroadcaster or accept the round-trip cost as a deliberate trade-off for cross-replica symmetry (no missed-delivery edge cases when a publisher dies between local fan-out and the NOTIFY round-trip).
Listener-reconnect window: pg_notify uses the platform's shared db pool (outbound), but local subscribers receive events through the dedicated pq.Listener connection. During a Listener reconnect (Disconnected → ConnectionAttemptFailed* → Reconnected, up to listenerMaxReconnect = 60s), Publish can succeed while the publisher's own subscribers miss the event. lib/pq emits a nil notification on reconnect to signal "you may have missed events"; for tools/list_changed this is harmless because the next list call catches up, but callers that need at-least-once delivery would need to track a sequence number across the gap.
func (*Broadcaster) Subscribe ¶ added in v1.58.0
func (b *Broadcaster) Subscribe(ctx context.Context, sessionID string) session.Subscription
Subscribe registers a local in-process SSE subscriber. The subscription receives every event NOTIFYed on the channel by any replica (including this one).
Honors the Broadcaster contract: after Close starts, Subscribe returns a closed-immediate subscription. Without this short-circuit, the ~2s Close drain window between b.closed.CompareAndSwap and b.local.Close would let Subscribe hand back a live subscription while Publish already returns ErrBroadcasterClosed — a paired- backend asymmetry against MemoryBroadcaster, which gates Subscribe on its own closed flag in the same step.
func (*Broadcaster) SubscriberCount ¶ added in v1.58.0
func (b *Broadcaster) SubscriberCount() int
SubscriberCount mirrors MemoryBroadcaster.SubscriberCount for tests.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements session.Store using PostgreSQL.
func (*Store) Close ¶
Close stops the cleanup goroutine and waits for it to exit. It is safe to call Close even if StartCleanupRoutine was never called.
func (*Store) Create ¶
Create persists a new session. If a row with the same ID already exists (e.g. an expired row not yet cleaned up), it is replaced via upsert to avoid unique constraint violations during session revival.
func (*Store) StartCleanupRoutine ¶
StartCleanupRoutine starts a background goroutine that periodically removes expired sessions. The goroutine is stopped when Close is called.