session

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package session provides session management for the MCP data platform. It defines the Store interface for session persistence and the Session type that represents an active client connection.

Index

Constants

This section is empty.

Variables

View Source
var ErrBroadcasterClosed = errors.New("session: broadcaster closed")

ErrBroadcasterClosed is returned by Publish on a Broadcaster that has been Closed. Callers should drop the event and avoid retry loops.

Functions

func AwareSessionID added in v0.33.4

func AwareSessionID(ctx context.Context) string

AwareSessionID returns the session ID set by AwareHandler, or "".

func WithAwareSessionID added in v0.33.4

func WithAwareSessionID(ctx context.Context, sessionID string) context.Context

WithAwareSessionID returns a context carrying the given session ID. This is used by AwareHandler internally and exposed for middleware that needs to read the session ID via AwareSessionID.

Types

type AwareHandler

type AwareHandler struct {
	// contains filtered or unexported fields
}

AwareHandler wraps an HTTP handler to manage MCP sessions against an external Store. It is used when the SDK runs in stateless mode to provide session persistence (e.g. for zero-downtime restarts).

func NewAwareHandler

func NewAwareHandler(inner http.Handler, cfg HandlerConfig) *AwareHandler

NewAwareHandler creates a handler that manages sessions externally.

func (*AwareHandler) ServeHTTP

func (h *AwareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP dispatches the request based on session state.

type Broadcaster added in v1.58.0

type Broadcaster interface {
	// Subscribe registers a new subscriber and returns the subscription.
	// ctx cancellation closes the subscription automatically. The
	// sessionID is recorded for log attribution only — events are
	// fan-out broadcast to every subscriber regardless of sessionID.
	// (tools/list_changed today is a server-wide signal, so per-
	// session targeting is intentionally not implemented; if a future
	// notification needs per-session delivery, add an Event.SessionID
	// filter at Publish time and document the contract change here.)
	Subscribe(ctx context.Context, sessionID string) Subscription
	// Publish delivers ev to every active subscription. Best-effort:
	// returns nil even when individual subscribers are dropped due to
	// buffer overflow. The error path is reserved for unrecoverable
	// transport faults (e.g., the underlying postgres LISTEN connection
	// is closed) so callers can decide whether to retry or surface.
	Publish(ctx context.Context, ev Event) error
	// Close releases any background resources. After Close, Subscribe
	// returns a closed-immediate subscription and Publish returns
	// ErrBroadcasterClosed.
	Close() error
}

Broadcaster fans out server-originated MCP notifications to every active SSE long-poll subscriber attached to the platform's session-aware HTTP handler.

Implementations are concurrency-safe. Publish must not block on a slow subscriber — buffer-full subscribers are dropped (with a warning) so a single misbehaving downstream client cannot stall the event pipeline for everyone else.

type Event added in v1.58.0

type Event struct {
	Method string         `json:"method"`
	Params map[string]any `json:"params,omitempty"`
}

Event is a server-originated MCP notification (a JSON-RPC notification that has no id and expects no response). The broadcaster delivers these to every connected SSE long-poll subscriber.

Method is the JSON-RPC method, e.g. "notifications/tools/list_changed". Params is the JSON-RPC params object — usually a small map with no payload (the method name itself is the signal). Pass nil when the notification carries no payload; the SSE writer will emit "{}".

type HandlerConfig

type HandlerConfig struct {
	Store Store
	TTL   time.Duration
	// Broadcaster delivers server-pushed MCP notifications (e.g.
	// notifications/tools/list_changed) to the per-session SSE
	// long-poll stream. When nil, GET requests are forwarded to the
	// inner handler unchanged — useful for tests or for transport
	// modes that don't need server push.
	Broadcaster Broadcaster
}

HandlerConfig configures an AwareHandler.

type MemoryBroadcaster added in v1.58.0

type MemoryBroadcaster struct {
	// contains filtered or unexported fields
}

MemoryBroadcaster is the in-memory Broadcaster implementation. It holds every active subscriber's channel under a single mutex and fan-outs each Publish synchronously across all of them. Suitable for single-replica deployments where all SSE long-poll clients hit the same process. Multi-replica deployments must wrap or compose this with a Postgres LISTEN/NOTIFY broadcaster (pkg/session/postgres).

func NewMemoryBroadcaster added in v1.58.0

func NewMemoryBroadcaster(logger *slog.Logger) *MemoryBroadcaster

NewMemoryBroadcaster builds a fresh in-memory broadcaster. Pass a non-nil logger to capture slow-subscriber warnings; nil falls back to slog.Default().

func (*MemoryBroadcaster) Close added in v1.58.0

func (b *MemoryBroadcaster) Close() error

Close releases every subscriber's channel and marks the broadcaster as closed. Subsequent Publish calls return ErrBroadcasterClosed and subsequent Subscribe calls return a closed-immediate subscription. Safe to call more than once.

func (*MemoryBroadcaster) Publish added in v1.58.0

func (b *MemoryBroadcaster) Publish(_ context.Context, ev Event) error

Publish fans out ev to every active subscriber. Best-effort delivery per the Broadcaster contract — slow subscribers (full channel) are dropped with slog.Warn rather than blocking the publisher.

func (*MemoryBroadcaster) Subscribe added in v1.58.0

func (b *MemoryBroadcaster) Subscribe(ctx context.Context, sessionID string) Subscription

Subscribe registers a new subscriber. The returned Subscription is always non-nil even when the broadcaster is already closed — in that case the channel is closed immediately so callers' range loops exit without special-casing the closed state.

func (*MemoryBroadcaster) SubscriberCount added in v1.58.0

func (b *MemoryBroadcaster) SubscriberCount() int

SubscriberCount returns the number of active subscribers. Used by tests and by the postgres bridge to decide whether to bother republishing a received NOTIFY locally.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore implements Store using an in-memory map with TTL-based expiration.

func NewMemoryStore

func NewMemoryStore(ttl time.Duration) *MemoryStore

NewMemoryStore creates a new in-memory session store.

func (*MemoryStore) Cleanup

func (s *MemoryStore) Cleanup(_ context.Context) error

Cleanup removes expired sessions.

func (*MemoryStore) Close

func (s *MemoryStore) Close() error

Close stops the cleanup goroutine and waits for it to exit. It is safe to call Close even if StartCleanupRoutine was never called.

func (*MemoryStore) Create

func (s *MemoryStore) Create(_ context.Context, sess *Session) error

Create persists a new session.

func (*MemoryStore) Delete

func (s *MemoryStore) Delete(_ context.Context, id string) error

Delete removes a session.

func (*MemoryStore) Get

func (s *MemoryStore) Get(_ context.Context, id string) (*Session, error)

Get retrieves a session by ID. Returns nil, nil if not found or expired.

func (*MemoryStore) List

func (s *MemoryStore) List(_ context.Context) ([]*Session, error)

List returns all non-expired sessions.

func (*MemoryStore) StartCleanupRoutine

func (s *MemoryStore) StartCleanupRoutine(interval time.Duration)

StartCleanupRoutine starts a background goroutine that periodically removes expired sessions. The goroutine is stopped when Close is called.

func (*MemoryStore) Touch

func (s *MemoryStore) Touch(_ context.Context, id string) error

Touch updates LastActiveAt and extends ExpiresAt by the store's TTL.

func (*MemoryStore) UpdateState

func (s *MemoryStore) UpdateState(_ context.Context, id string, state map[string]any) error

UpdateState merges state into the session's State map.

type Session

type Session struct {
	// ID is the unique session identifier.
	ID string

	// UserID identifies the session owner. For authenticated sessions this is
	// a hash of the bearer token; for anonymous sessions it is empty.
	UserID string

	// CreatedAt is when the session was established.
	CreatedAt time.Time

	// LastActiveAt is the most recent activity timestamp.
	LastActiveAt time.Time

	// ExpiresAt is when the session expires if not touched.
	ExpiresAt time.Time

	// State holds extensible session data (e.g. enrichment dedup state).
	State map[string]any
}

Session represents an active client session.

type Store

type Store interface {
	// Create persists a new session.
	Create(ctx context.Context, s *Session) error

	// Get retrieves a session by ID. Returns nil, nil if not found or expired.
	Get(ctx context.Context, id string) (*Session, error)

	// Touch updates LastActiveAt and extends ExpiresAt by the store's TTL.
	Touch(ctx context.Context, id string) error

	// Delete removes a session.
	Delete(ctx context.Context, id string) error

	// List returns all non-expired sessions.
	List(ctx context.Context) ([]*Session, error)

	// UpdateState merges state into the session's State map.
	UpdateState(ctx context.Context, id string, state map[string]any) error

	// Cleanup removes expired sessions.
	Cleanup(ctx context.Context) error

	// Close stops background routines and releases resources.
	Close() error
}

Store defines the interface for session persistence.

type Subscription added in v1.58.0

type Subscription interface {
	// Events returns the channel events are delivered on. The channel is
	// closed by Close, after which receives yield zero values.
	Events() <-chan Event
	// Close releases the subscription. Safe to call more than once.
	Close()
}

Subscription is a per-subscriber stream of Events. Subscribers MUST call Close when they no longer want events; failing to do so would leak the underlying buffered channel and a slot in the broadcaster's fan-out map.

Directories

Path Synopsis
Package postgres provides PostgreSQL storage and pub/sub plumbing for the platform's session layer.
Package postgres provides PostgreSQL storage and pub/sub plumbing for the platform's session layer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL