postgres

package
v1.60.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package postgres provides PostgreSQL storage and pub/sub plumbing for the platform's session layer.

Package postgres provides PostgreSQL storage for sessions.

Index

Constants

View Source
const DefaultNotifyChannel = "mcp_notifications"

DefaultNotifyChannel is the postgres LISTEN/NOTIFY channel name used by the platform for cross-replica MCP notification fan-out. Hard-coded rather than per-deployment so every replica that joins the same database immediately participates in the same event stream without extra config. The string is a postgres identifier (no quoting); keep it lowercase and ASCII.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broadcaster added in v1.58.0

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster is a postgres-backed session.Broadcaster. Every active replica LISTENs on the same channel and re-publishes received notifications to its local in-memory subscribers, so a tool-list change on any replica fan-outs to all SSE long-poll clients across the cluster.

Wire model:

  • Publish encodes the Event as JSON and issues `SELECT pg_notify($1, $2)`. No local short-circuit — every replica (including the publisher) hears the event via its own LISTEN connection, so behavior is symmetric and there is no missed-delivery edge case when a publisher goes down between local fan-out and the NOTIFY round-trip.
  • The background goroutine reads pq.Listener.NotificationChannel() and forwards each Notification to the embedded local MemoryBroadcaster. Local SSE subscribers receive events exactly as if a same-process Publish had fired.

func NewBroadcaster added in v1.58.0

func NewBroadcaster(dsn string, db *sql.DB, channel string, logger *slog.Logger) (*Broadcaster, error)

NewBroadcaster builds a postgres-backed broadcaster bound to the given channel. dsn is the libpq connection string used by the dedicated LISTEN connection; db is the sql.DB used for outbound pg_notify calls (re-using the platform's connection pool keeps connection counts predictable).

On success the returned broadcaster has already issued LISTEN and the background goroutine is running. Returns an error if LISTEN fails — typically a missing privilege on the role or a syntactically invalid channel name. Does NOT block on initial connectivity to postgres beyond the LISTEN command itself.

func (*Broadcaster) Close added in v1.58.0

func (b *Broadcaster) Close() error

Close stops the background goroutine, closes the local broadcaster (releasing every subscriber), and tears down the listen connection. Idempotent. Returns the first non-nil error encountered.

func (*Broadcaster) Publish added in v1.58.0

func (b *Broadcaster) Publish(ctx context.Context, ev session.Event) error

Publish encodes the event and issues pg_notify. Returns ErrBroadcasterClosed after Close. Surfacing pg_notify errors to callers is intentional — they pick whether to retry; we do not.

Latency note: there is no local short-circuit. Even the publishing replica's local subscribers receive the event only after the pg_notify round-trip + LISTEN dispatch. Under normal postgres latency this is sub-millisecond; under DB pressure it can stretch into 100s of ms. The gateway's notifyDispatchTimeout (5s) bounds the worst case so a partitioned DB cannot leak goroutines, but callers expecting same-process delivery should use MemoryBroadcaster or accept the round-trip cost as a deliberate trade-off for cross-replica symmetry (no missed-delivery edge cases when a publisher dies between local fan-out and the NOTIFY round-trip).

Listener-reconnect window: pg_notify uses the platform's shared db pool (outbound), but local subscribers receive events through the dedicated pq.Listener connection. During a Listener reconnect (Disconnected → ConnectionAttemptFailed* → Reconnected, up to listenerMaxReconnect = 60s), Publish can succeed while the publisher's own subscribers miss the event. lib/pq emits a nil notification on reconnect to signal "you may have missed events"; for tools/list_changed this is harmless because the next list call catches up, but callers that need at-least-once delivery would need to track a sequence number across the gap.

func (*Broadcaster) Subscribe added in v1.58.0

func (b *Broadcaster) Subscribe(ctx context.Context, sessionID string) session.Subscription

Subscribe registers a local in-process SSE subscriber. The subscription receives every event NOTIFYed on the channel by any replica (including this one).

Honors the Broadcaster contract: after Close starts, Subscribe returns a closed-immediate subscription. Without this short-circuit, the ~2s Close drain window between b.closed.CompareAndSwap and b.local.Close would let Subscribe hand back a live subscription while Publish already returns ErrBroadcasterClosed — a paired- backend asymmetry against MemoryBroadcaster, which gates Subscribe on its own closed flag in the same step.

func (*Broadcaster) SubscriberCount added in v1.58.0

func (b *Broadcaster) SubscriberCount() int

SubscriberCount mirrors MemoryBroadcaster.SubscriberCount for tests.

type Config

type Config struct {
	TTL time.Duration
}

Config configures the PostgreSQL session store.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements session.Store using PostgreSQL.

func New

func New(db *sql.DB, cfg Config) *Store

New creates a new PostgreSQL session store.

func (*Store) Cleanup

func (s *Store) Cleanup(ctx context.Context) error

Cleanup removes expired sessions.

func (*Store) Close

func (s *Store) Close() error

Close stops the cleanup goroutine and waits for it to exit. It is safe to call Close even if StartCleanupRoutine was never called.

func (*Store) Create

func (s *Store) Create(ctx context.Context, sess *session.Session) error

Create persists a new session. If a row with the same ID already exists (e.g. an expired row not yet cleaned up), it is replaced via upsert to avoid unique constraint violations during session revival.

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete removes a session.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*session.Session, error)

Get retrieves a session by ID. Returns nil, nil if not found or expired.

func (*Store) List

func (s *Store) List(ctx context.Context) ([]*session.Session, error)

List returns all non-expired sessions.

func (*Store) StartCleanupRoutine

func (s *Store) StartCleanupRoutine(interval time.Duration)

StartCleanupRoutine starts a background goroutine that periodically removes expired sessions. The goroutine is stopped when Close is called.

func (*Store) Touch

func (s *Store) Touch(ctx context.Context, id string) error

Touch updates LastActiveAt and extends ExpiresAt by the store's TTL.

func (*Store) UpdateState

func (s *Store) UpdateState(ctx context.Context, id string, state map[string]any) error

UpdateState merges state into the session's State map using JSONB concatenation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL