sessions

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package sessions owns Harbor's session-lifecycle subsystem.

A Session is a longer-lived multi-turn conversation that contains many Runs. Identity for runtime concerns is the triple `(tenant, user, session)`; runs are scoped within sessions (RFC §6.9). The Session record itself is keyed in the StateStore at `Kind = "session.lifecycle"` with `RunID = ""` — sessions are session-scoped, not run-scoped.

Phase 08 ships a single concrete *Registry implementation that sits over Phase 07's StateStore, codifying the typed-wrapper-over-generic contract from D-027. There is no driver pluralism at the session layer; driver pluralism lives at StateStore (in-mem / SQLite / Postgres at Phases 07 / 15 / 16). Per AGENTS.md §4.4, optional-capability ceremony is forbidden when all V1 drivers (here: implementations) will implement everything.

Four lifetime invariants are load-bearing and pinned by tests:

  1. Identity captured immutably on Open — Touch / Close re-save the same identity from the existing record; mismatched ctx identity is rejected with ErrIdentityMismatch.
  2. Reopen-after-close forbidden — clients open a new SessionID.
  3. Cross-tenant SessionID reuse rejected — `SessionID=S` opened under Tenant A then attempted under Tenant B returns ErrSessionIDReuse, even though the StateStore key (which contains the full Quadruple) would not naturally collide.
  4. GC never reaps a session with a RUNNING task — enforced when the TaskRegistry-backed probe (TaskRunningProbe) is wired via WithGCPolicy, as both reference assemblies do. The no-op default (returns false, nil) exists only for registries constructed without task awareness.

Lifecycle events (`session.opened / .touched / .closed / .gc_reaped`) land on the EventBus as `SafePayload` types — they're Harbor-internal markers with no secret-shaped fields by construction (RFC §6.13, D-028). Subscribers can extract typed fields directly, no redactor walk in between.

Index

Constants

View Source
const (
	EventTypeSessionOpened   events.EventType = "session.opened"
	EventTypeSessionTouched  events.EventType = "session.touched"
	EventTypeSessionClosed   events.EventType = "session.closed"
	EventTypeSessionGCReaped events.EventType = "session.gc_reaped"
)

Session lifecycle event types. Each is registered with the events package's exhaustive registry via init() so Publish accepts them without ErrUnknownEventType. Subscribers can filter on these via events.Filter.Types.

Variables

View Source
var (
	// ErrReopenAfterClose — Open called for a SessionID whose existing
	// record is Closed. Per RFC §6.9 ("Reopen-after-close is forbidden").
	ErrReopenAfterClose = errors.New("sessions: reopen-after-close forbidden")
	// ErrSessionIDReuse — Open called with a SessionID already opened
	// under a different (tenant, user). Per RFC §6.9 ("reusing a session
	// ID across tenants/users is rejected").
	ErrSessionIDReuse = errors.New("sessions: SessionID reused across tenants/users")
	// ErrIdentityMismatch — Touch / Close called with a ctx Identity
	// that disagrees with the stored session's Identity. The triple is
	// captured immutably on Open; mid-flight identity swaps are bugs.
	ErrIdentityMismatch = errors.New("sessions: ctx identity mismatches stored session identity")
	// ErrSessionNotFound — Get / Touch / Close / Inspect targeting a
	// SessionID that has no record (or the record was Deleted).
	ErrSessionNotFound = errors.New("sessions: session not found")
	// ErrSessionAlreadyOpen — Open called twice with the same triple
	// AND SessionID without an intervening Close. Distinct from
	// ErrReopenAfterClose (which fires when Closed is true).
	ErrSessionAlreadyOpen = errors.New("sessions: session already open")
	// ErrRegistryClosed — any operation called after CloseRegistry.
	ErrRegistryClosed = errors.New("sessions: registry is closed")
)

Sentinel errors. Callers compare via errors.Is.

Functions

This section is empty.

Types

type Clock

type Clock interface {
	Now() time.Time
}

Clock abstracts time so GC tests are deterministic without time.Sleep. Production code uses realClock; tests pass a fakeClock.

The interface intentionally returns time.Time directly (not a monotonic count) so GC's wall-clock math is identical between production and test paths.

type GCPolicy

type GCPolicy struct {
	IdleTTL       time.Duration
	HardCap       time.Duration
	SweepInterval time.Duration
	RunningProbe  RunningProbe
}

GCPolicy bundles the GC sweeper's tunables. Defaults match RFC §6.9: IdleTTL 24h, HardCap 720h (30 days), SweepInterval 15m. The RunningProbe should be TaskRunningProbe wherever a TaskRegistry is in scope; default is the no-op.

type Option

type Option func(*Registry)

Option configures a Registry at construction. Only WithClock is exported; production code does not touch it. Tests use a fake clock to drive GC deterministically.

func WithClock

func WithClock(c Clock) Option

WithClock injects a custom Clock. Production code uses realClock; the test suite uses a controllable clock to exercise hard-cap GC without time.Sleep (per AGENTS.md §11).

func WithGCPolicy

func WithGCPolicy(p GCPolicy) Option

WithGCPolicy injects an explicit GCPolicy at construction. When omitted, the GCPolicy is built from the SessionsConfig defaults + the no-op RunningProbe. Production wiring (cmd/harbor and harbortest/devstack) passes a policy whose RunningProbe is TaskRunningProbe so GC honors RFC §6.9.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the StateStore-backed implementation of SessionRegistry. It is the single concrete impl in V1 — driver pluralism lives at the StateStore layer.

Concurrency model:

  • StateStore writes go through `state.StateStore.Save` which is itself concurrent-safe per D-025.
  • The cross-tenant SessionID-uniqueness map is guarded by mu.
  • The sweeper goroutine's lifecycle is owned by `done` + `wg`.

func New

func New(store state.StateStore, cfg config.SessionsConfig, bus events.EventBus, opts ...Option) (*Registry, error)

New constructs a Registry. The store and bus are required; cfg supplies the GC tunables (defaults applied if zero); opts can override the Clock or GCPolicy.

The sweeper goroutine starts immediately and runs at gcPolicy.SweepInterval until CloseRegistry is called. The probe defaults to no-op; assemblies that own a TaskRegistry MUST pass TaskRunningProbe via WithGCPolicy (both reference assemblies do) so GC never reaps a session with a RUNNING task.

func (*Registry) Close

func (r *Registry) Close(ctx context.Context, id string, reason string) error

Close marks the session Closed and emits session.closed. Idempotent: closing an already-closed session is a no-op AND preserves the original ClosedReason.

func (*Registry) CloseRegistry

func (r *Registry) CloseRegistry(_ context.Context) error

CloseRegistry cancels the sweeper goroutine and joins it. Idempotent. Subsequent operations return ErrRegistryClosed.

func (*Registry) EnsureOpen added in v1.2.0

func (r *Registry) EnsureOpen(ctx context.Context, ident identity.Identity) (*Session, error)

EnsureOpen is the create-on-first-use entry point (D-171). It returns the live session for `ident`, creating it if no record exists. The session id is `ident.SessionID` (the per-request session the client chose); the (tenant, user) come from the verified connection token.

Semantics:

  • No record yet → Open it (create) and return the fresh session.
  • Open record at this exact triple → no-op, return the existing session (a second turn in the same conversation is not an error).
  • Closed record at this triple → ErrReopenAfterClose (RFC §6.9: a GC-reaped or operator-closed session is read-only; a NEW conversation must pick a NEW session id). The caller maps this to a clear client error; it is never a silent revive.

EnsureOpen is identity-mandatory: an incomplete triple fails closed. It is the seam the Protocol ControlSurface calls on `start` so the first turn of a brand-new conversation materialises the session row the Console's sessions.list reads.

func (*Registry) GC

func (r *Registry) GC(ctx context.Context, policy GCPolicy) (int, error)

GC performs a single sweep pass. For each currently-open session:

  • if RunningProbe returns true: skip (per RFC §6.9 "GC never reaps a session with a RUNNING task").
  • else if LastSeen + IdleTTL < clock.Now(): close with reason "gc:idle".
  • else if OpenedAt + HardCap < clock.Now(): close with reason "gc:hard_cap" (the hard cap wins over recent Touch).

Returns the number of sessions reaped and the first probe / close error encountered (the sweep continues past errors so a single bad session doesn't block the rest).

func (*Registry) Get

func (r *Registry) Get(ctx context.Context, id string) (*Session, error)

Get loads the session with `id` for the identity in ctx. Returns ErrSessionNotFound when the record is absent. Identity-mandatory: the ctx Identity is used to scope the StateStore read; cross-tenant access is prevented because the StateStore key contains the full triple.

func (*Registry) Inspect

func (r *Registry) Inspect(ctx context.Context, id string) (*SessionSnapshot, error)

Inspect returns a SessionSnapshot. Running is derived from the configured RunningProbe at inspection time.

func (*Registry) ListSnapshots

func (r *Registry) ListSnapshots(ctx context.Context, f SessionListFilter) ([]SessionSnapshot, error)

ListSnapshots implements sessions.SessionLister — the Phase 72c `search.sessions` read-side projection. Returns snapshots for every session the registry has seen (open OR closed) matching the filter.

The registry's in-memory `idIndex` is the catalog of every SessionID that has been Opened during this registry's lifetime. The snapshot is built by Loading each matching session from the StateStore so the Closed / ClosedAt / LastSeen fields are current; Running is derived from the GCPolicy RunningProbe at inspection time (mirroring Inspect's contract).

The caller (the search subsystem) is responsible for the auth scope gate — ListSnapshots does NOT re-check scope. It DOES validate that every supplied `TenantIDs` / `UserIDs` / `SessionIDs` entry is non-empty (a no-op for empty filters).

Concurrent reuse (D-025): ListSnapshots only reads `idIndex` / `openSessions` under the registry's mutex; no per-call state lives on `*Registry`. One Registry serves N concurrent ListSnapshots safely.

func (*Registry) Open

func (r *Registry) Open(ctx context.Context, id string, ident identity.Identity) (*Session, error)

Open creates a new session record, captures the identity triple immutably, and emits session.opened. Cross-tenant SessionID reuse and reopen-after-close are rejected; same-triple double-open is rejected with ErrSessionAlreadyOpen.

func (*Registry) Touch

func (r *Registry) Touch(ctx context.Context, id string) error

Touch updates LastSeen and re-saves. Identity-mandatory; ctx Identity is compared against the stored Identity, mismatch returns ErrIdentityMismatch. Touch on a Closed session returns ErrReopenAfterClose (Closed records are read-only).

type RunningProbe

type RunningProbe func(ctx context.Context, q identity.Quadruple) (bool, error)

RunningProbe is the seam the GC sweeper consults so it can honor "never reap a session with a RUNNING task" (RFC §6.9). The TaskRegistry-backed implementation is TaskRunningProbe; both reference assemblies wire it via WithGCPolicy. A nil probe is treated as the no-op default (returns false, nil) — only for registries constructed without task awareness.

func TaskRunningProbe added in v1.3.0

func TaskRunningProbe(reg tasks.TaskRegistry) RunningProbe

TaskRunningProbe adapts a tasks.TaskRegistry into the RunningProbe seam so the GC sweeper can enforce RFC §6.9's "GC never reaps a session with a RUNNING task" invariant. The probe lists the session's tasks filtered to tasks.StatusRunning (the exact status the RFC names — PENDING / PAUSED tasks do not block GC) and reports whether any exist.

Wiring: both reference assemblies (cmd/harbor `bootDevStack` and harbortest/devstack `Assemble`) pass this probe via sessions.New(..., WithGCPolicy(GCPolicy{RunningProbe: ...})). A registry constructed without it falls back to the no-op default, which reports no running tasks — sessions with in-flight work become reapable, so assemblies that own a TaskRegistry MUST wire this adapter.

The import direction is sound by construction: internal/tasks does not import internal/sessions (verified at the time of authoring), so the adapter lives here, next to the RunningProbe type it satisfies.

type Session

type Session struct {
	ID           string
	Identity     identity.Identity
	OpenedAt     time.Time
	LastSeen     time.Time
	Closed       bool
	ClosedAt     time.Time
	ClosedReason string
	Limits       SessionLimits
	Context      map[string]any
}

Session is the persisted lifecycle record for one session. The Identity field carries the triple captured on Open and is immutable afterwards. Closed transitions to true on Close; ClosedAt stays zero while Closed is false.

Limits and Context are reserved slots — Phase 36a/b will populate Limits with the cost / token ceilings; Phase 23+ will populate Context with the (version, hash, llm/tool ctx, memory, artifacts) quintuple sketched in RFC §6.9. Phase 08 round-trips both fields through marshal/unmarshal but applies no validation.

type SessionClosedPayload

type SessionClosedPayload struct {
	events.SafeSealed
	SessionID string
	ClosedAt  int64 // unix nanoseconds
	Reason    string
}

SessionClosedPayload reports a Close. Carries the SessionID, the ClosedAt timestamp, and the operator-provided Reason. Reason is a short caller-controlled string — callers MUST NOT pass tool args, raw user input, or any secret-shaped material; the bus does not re-redact SafePayload types (D-020 / D-028).

type SessionGCReapedPayload

type SessionGCReapedPayload struct {
	events.SafeSealed
	SessionID string
	ReapedAt  int64 // unix nanoseconds
	Reason    string
}

SessionGCReapedPayload reports a GC sweep reaping. Reason is one of "gc:idle" or "gc:hard_cap"; the same string is also stored in Session.ClosedReason. SafePayload by construction.

type SessionLimits

type SessionLimits struct {
}

SessionLimits is reserved for Phase 36a (cost ceilings) and Phase 26 (tool catalog). Empty in Phase 08; round-trips through marshal.

type SessionListFilter

type SessionListFilter struct {
	TenantIDs     []string
	UserIDs       []string
	SessionIDs    []string
	SinceLastSeen time.Time
	UntilLastSeen time.Time
	IncludeClosed bool
}

SessionListFilter narrows ListSnapshots's result set. All fields are wildcards when empty. SinceLastSeen / UntilLastSeen filter by the LastSeen timestamp; zero means "no bound."

type SessionLister

type SessionLister interface {
	// ListSnapshots returns session snapshots that match the filter,
	// scoped to the requested tenants. Empty `TenantIDs` matches every
	// tenant the registry has seen (the search subsystem gates this
	// on the caller's auth.ScopeAdmin claim — the registry does NOT
	// re-check scope). Empty `UserIDs` / `SessionIDs` are wildcards.
	ListSnapshots(ctx context.Context, f SessionListFilter) ([]SessionSnapshot, error)
}

SessionLister is the narrow read-side capability the Phase 72c `search.sessions` Searcher consumes. The triple `(tenant, user, session)` is the load-bearing isolation key (CLAUDE.md §6); the listing is server-enforced per the supplied SessionListFilter. The returned snapshots include both currently-open and previously-closed sessions — search wants the union.

Intentionally NOT on the SessionRegistry interface: the lister is a projection over the in-memory open-session index plus the StateStore. Concrete `*Registry` implements it; future drivers add it when their backing store gains a `list` capability (StateStore List is post-V1).

type SessionOpenedPayload

type SessionOpenedPayload struct {
	events.SafeSealed
	SessionID string
	OpenedAt  int64 // unix nanoseconds; identity-of-record across drivers
}

SessionOpenedPayload reports a successful Open. Carries the SessionID and the OpenedAt timestamp; the identity triple lives on the Event itself, so it is intentionally NOT duplicated here.

SafePayload by construction — no secret-shaped fields.

type SessionRegistry

type SessionRegistry interface {
	Open(ctx context.Context, id string, ident identity.Identity) (*Session, error)
	// EnsureOpen is the create-on-first-use entry point (D-171): it
	// returns the live session for ident, creating it if absent and
	// no-opping if already open. A closed session is NOT revived
	// (ErrReopenAfterClose). See the concrete impl for full semantics.
	EnsureOpen(ctx context.Context, ident identity.Identity) (*Session, error)
	Get(ctx context.Context, id string) (*Session, error)
	Touch(ctx context.Context, id string) error
	Close(ctx context.Context, id string, reason string) error
	Inspect(ctx context.Context, id string) (*SessionSnapshot, error)
	GC(ctx context.Context, policy GCPolicy) (int, error)

	// CloseRegistry cancels the sweeper goroutine and joins it.
	// Idempotent. Distinct method name (rather than Close) so it
	// doesn't collide with Close(id, reason).
	CloseRegistry(ctx context.Context) error
}

SessionRegistry is the public surface every consumer (Phase 09 envelope writer, Phase 60 Protocol surface, Phase 72-75 Console subscribers, etc.) talks to. One concrete impl ships in Phase 08 (`*Registry`); driver pluralism lives at the StateStore layer.

type SessionSnapshot

type SessionSnapshot struct {
	Session
	Running bool
}

SessionSnapshot is the read-side projection returned by Inspect. Carries the lifecycle fields plus a Running boolean derived from the GCPolicy.RunningProbe at inspection time. Running is intrinsically stale by the time the caller reads it; the same is true of any snapshot model.

type SessionTouchedPayload

type SessionTouchedPayload struct {
	events.SafeSealed
	SessionID string
	LastSeen  int64 // unix nanoseconds
}

SessionTouchedPayload reports a Touch. Carries the SessionID and the new LastSeen timestamp. SafePayload by construction.

Directories

Path Synopsis
Package protocol implements the two `sessions.*` Protocol methods the Console Sessions page (Phase 73c / D-122) consumes:
Package protocol implements the two `sessions.*` Protocol methods the Console Sessions page (Phase 73c / D-122) consumes:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL