Documentation
¶
Overview ¶
Package session provides session management for the MCP data platform. It defines the Store interface for session persistence and the Session type that represents an active client connection.
Index ¶
- Variables
- func AwareSessionID(ctx context.Context) string
- func WithAwareSessionID(ctx context.Context, sessionID string) context.Context
- type AwareHandler
- type Broadcaster
- type Event
- type HandlerConfig
- type MemoryBroadcaster
- type MemoryStore
- func (s *MemoryStore) Cleanup(_ context.Context) error
- func (s *MemoryStore) Close() error
- func (s *MemoryStore) Create(_ context.Context, sess *Session) error
- func (s *MemoryStore) Delete(_ context.Context, id string) error
- func (s *MemoryStore) Get(_ context.Context, id string) (*Session, error)
- func (s *MemoryStore) List(_ context.Context) ([]*Session, error)
- func (s *MemoryStore) StartCleanupRoutine(interval time.Duration)
- func (s *MemoryStore) Touch(_ context.Context, id string) error
- func (s *MemoryStore) UpdateState(_ context.Context, id string, state map[string]any) error
- type Session
- type Store
- type Subscription
Constants ¶
This section is empty.
Variables ¶
var ErrBroadcasterClosed = errors.New("session: broadcaster closed")
ErrBroadcasterClosed is returned by Publish on a Broadcaster that has been Closed. Callers should drop the event and avoid retry loops.
Functions ¶
func AwareSessionID ¶ added in v0.33.4
AwareSessionID returns the session ID set by AwareHandler, or "".
func WithAwareSessionID ¶ added in v0.33.4
WithAwareSessionID returns a context carrying the given session ID. This is used by AwareHandler internally and exposed for middleware that needs to read the session ID via AwareSessionID.
Types ¶
type AwareHandler ¶
type AwareHandler struct {
// contains filtered or unexported fields
}
AwareHandler wraps an HTTP handler to manage MCP sessions against an external Store. It is used when the SDK runs in stateless mode to provide session persistence (e.g. for zero-downtime restarts).
func NewAwareHandler ¶
func NewAwareHandler(inner http.Handler, cfg HandlerConfig) *AwareHandler
NewAwareHandler creates a handler that manages sessions externally.
func (*AwareHandler) ServeHTTP ¶
func (h *AwareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP dispatches the request based on session state.
type Broadcaster ¶ added in v1.58.0
type Broadcaster interface {
// Subscribe registers a new subscriber and returns the subscription.
// ctx cancellation closes the subscription automatically. The
// sessionID is recorded for log attribution only — events are
// fan-out broadcast to every subscriber regardless of sessionID.
// (tools/list_changed today is a server-wide signal, so per-
// session targeting is intentionally not implemented; if a future
// notification needs per-session delivery, add an Event.SessionID
// filter at Publish time and document the contract change here.)
Subscribe(ctx context.Context, sessionID string) Subscription
// Publish delivers ev to every active subscription. Best-effort:
// returns nil even when individual subscribers are dropped due to
// buffer overflow. The error path is reserved for unrecoverable
// transport faults (e.g., the underlying postgres LISTEN connection
// is closed) so callers can decide whether to retry or surface.
Publish(ctx context.Context, ev Event) error
// Close releases any background resources. After Close, Subscribe
// returns a closed-immediate subscription and Publish returns
// ErrBroadcasterClosed.
Close() error
}
Broadcaster fans out server-originated MCP notifications to every active SSE long-poll subscriber attached to the platform's session-aware HTTP handler.
Implementations are concurrency-safe. Publish must not block on a slow subscriber — buffer-full subscribers are dropped (with a warning) so a single misbehaving downstream client cannot stall the event pipeline for everyone else.
type Event ¶ added in v1.58.0
Event is a server-originated MCP notification (a JSON-RPC notification that has no id and expects no response). The broadcaster delivers these to every connected SSE long-poll subscriber.
Method is the JSON-RPC method, e.g. "notifications/tools/list_changed". Params is the JSON-RPC params object — usually a small map with no payload (the method name itself is the signal). Pass nil when the notification carries no payload; the SSE writer will emit "{}".
type HandlerConfig ¶
type HandlerConfig struct {
Store Store
TTL time.Duration
// Broadcaster delivers server-pushed MCP notifications (e.g.
// notifications/tools/list_changed) to the per-session SSE
// long-poll stream. When nil, GET requests are forwarded to the
// inner handler unchanged — useful for tests or for transport
// modes that don't need server push.
Broadcaster Broadcaster
}
HandlerConfig configures an AwareHandler.
type MemoryBroadcaster ¶ added in v1.58.0
type MemoryBroadcaster struct {
// contains filtered or unexported fields
}
MemoryBroadcaster is the in-memory Broadcaster implementation. It holds every active subscriber's channel under a single mutex and fan-outs each Publish synchronously across all of them. Suitable for single-replica deployments where all SSE long-poll clients hit the same process. Multi-replica deployments must wrap or compose this with a Postgres LISTEN/NOTIFY broadcaster (pkg/session/postgres).
func NewMemoryBroadcaster ¶ added in v1.58.0
func NewMemoryBroadcaster(logger *slog.Logger) *MemoryBroadcaster
NewMemoryBroadcaster builds a fresh in-memory broadcaster. Pass a non-nil logger to capture slow-subscriber warnings; nil falls back to slog.Default().
func (*MemoryBroadcaster) Close ¶ added in v1.58.0
func (b *MemoryBroadcaster) Close() error
Close releases every subscriber's channel and marks the broadcaster as closed. Subsequent Publish calls return ErrBroadcasterClosed and subsequent Subscribe calls return a closed-immediate subscription. Safe to call more than once.
func (*MemoryBroadcaster) Publish ¶ added in v1.58.0
func (b *MemoryBroadcaster) Publish(_ context.Context, ev Event) error
Publish fans out ev to every active subscriber. Best-effort delivery per the Broadcaster contract — slow subscribers (full channel) are dropped with slog.Warn rather than blocking the publisher.
func (*MemoryBroadcaster) Subscribe ¶ added in v1.58.0
func (b *MemoryBroadcaster) Subscribe(ctx context.Context, sessionID string) Subscription
Subscribe registers a new subscriber. The returned Subscription is always non-nil even when the broadcaster is already closed — in that case the channel is closed immediately so callers' range loops exit without special-casing the closed state.
func (*MemoryBroadcaster) SubscriberCount ¶ added in v1.58.0
func (b *MemoryBroadcaster) SubscriberCount() int
SubscriberCount returns the number of active subscribers. Used by tests and by the postgres bridge to decide whether to bother republishing a received NOTIFY locally.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore implements Store using an in-memory map with TTL-based expiration.
func NewMemoryStore ¶
func NewMemoryStore(ttl time.Duration) *MemoryStore
NewMemoryStore creates a new in-memory session store.
func (*MemoryStore) Cleanup ¶
func (s *MemoryStore) Cleanup(_ context.Context) error
Cleanup removes expired sessions.
func (*MemoryStore) Close ¶
func (s *MemoryStore) Close() error
Close stops the cleanup goroutine and waits for it to exit. It is safe to call Close even if StartCleanupRoutine was never called.
func (*MemoryStore) Create ¶
func (s *MemoryStore) Create(_ context.Context, sess *Session) error
Create persists a new session.
func (*MemoryStore) Delete ¶
func (s *MemoryStore) Delete(_ context.Context, id string) error
Delete removes a session.
func (*MemoryStore) List ¶
func (s *MemoryStore) List(_ context.Context) ([]*Session, error)
List returns all non-expired sessions.
func (*MemoryStore) StartCleanupRoutine ¶
func (s *MemoryStore) StartCleanupRoutine(interval time.Duration)
StartCleanupRoutine starts a background goroutine that periodically removes expired sessions. The goroutine is stopped when Close is called.
func (*MemoryStore) Touch ¶
func (s *MemoryStore) Touch(_ context.Context, id string) error
Touch updates LastActiveAt and extends ExpiresAt by the store's TTL.
func (*MemoryStore) UpdateState ¶
UpdateState merges state into the session's State map.
type Session ¶
type Session struct {
// ID is the unique session identifier.
ID string
// UserID identifies the session owner. For authenticated sessions this is
// a hash of the bearer token; for anonymous sessions it is empty.
UserID string
// CreatedAt is when the session was established.
CreatedAt time.Time
// LastActiveAt is the most recent activity timestamp.
LastActiveAt time.Time
// ExpiresAt is when the session expires if not touched.
ExpiresAt time.Time
// State holds extensible session data (e.g. enrichment dedup state).
State map[string]any
}
Session represents an active client session.
type Store ¶
type Store interface {
// Create persists a new session.
Create(ctx context.Context, s *Session) error
// Get retrieves a session by ID. Returns nil, nil if not found or expired.
Get(ctx context.Context, id string) (*Session, error)
// Touch updates LastActiveAt and extends ExpiresAt by the store's TTL.
Touch(ctx context.Context, id string) error
// Delete removes a session.
Delete(ctx context.Context, id string) error
// List returns all non-expired sessions.
List(ctx context.Context) ([]*Session, error)
// UpdateState merges state into the session's State map.
UpdateState(ctx context.Context, id string, state map[string]any) error
// Cleanup removes expired sessions.
Cleanup(ctx context.Context) error
// Close stops background routines and releases resources.
Close() error
}
Store defines the interface for session persistence.
type Subscription ¶ added in v1.58.0
type Subscription interface {
// Events returns the channel events are delivered on. The channel is
// closed by Close, after which receives yield zero values.
Events() <-chan Event
// Close releases the subscription. Safe to call more than once.
Close()
}
Subscription is a per-subscriber stream of Events. Subscribers MUST call Close when they no longer want events; failing to do so would leak the underlying buffered channel and a slot in the broadcaster's fan-out map.