Documentation
¶
Overview ¶
Package sessions owns Harbor's session-lifecycle subsystem.
A Session is a longer-lived multi-turn conversation that contains many Runs. Identity for runtime concerns is the triple `(tenant, user, session)`; runs are scoped within sessions (RFC §6.9). The Session record itself is keyed in the StateStore at `Kind = "session.lifecycle"` with `RunID = ""` — sessions are session-scoped, not run-scoped.
Phase 08 ships a single concrete *Registry implementation that sits over Phase 07's StateStore, codifying the typed-wrapper-over-generic contract from D-027. There is no driver pluralism at the session layer; driver pluralism lives at StateStore (in-mem / SQLite / Postgres at Phases 07 / 15 / 16). Per AGENTS.md §4.4, optional-capability ceremony is forbidden when all V1 drivers (here: implementations) will implement everything.
Four lifetime invariants are load-bearing and pinned by tests:
- Identity captured immutably on Open — Touch / Close re-save the same identity from the existing record; mismatched ctx identity is rejected with ErrIdentityMismatch.
- Reopen-after-close forbidden — clients open a new SessionID.
- Cross-tenant SessionID reuse rejected — `SessionID=S` opened under Tenant A then attempted under Tenant B returns ErrSessionIDReuse, even though the StateStore key (which contains the full Quadruple) would not naturally collide.
- GC never reaps a session with a RUNNING task — enforced when the TaskRegistry-backed probe (TaskRunningProbe) is wired via WithGCPolicy, as both reference assemblies do. The no-op default (returns false, nil) exists only for registries constructed without task awareness.
Lifecycle events (`session.opened / .touched / .closed / .gc_reaped`) land on the EventBus as `SafePayload` types — they're Harbor-internal markers with no secret-shaped fields by construction (RFC §6.13, D-028). Subscribers can extract typed fields directly, no redactor walk in between.
Index ¶
- Constants
- Variables
- type Clock
- type GCPolicy
- type Option
- type Registry
- func (r *Registry) Close(ctx context.Context, id string, reason string) error
- func (r *Registry) CloseRegistry(_ context.Context) error
- func (r *Registry) EnsureOpen(ctx context.Context, ident identity.Identity) (*Session, error)
- func (r *Registry) GC(ctx context.Context, policy GCPolicy) (int, error)
- func (r *Registry) Get(ctx context.Context, id string) (*Session, error)
- func (r *Registry) Inspect(ctx context.Context, id string) (*SessionSnapshot, error)
- func (r *Registry) ListSnapshots(ctx context.Context, f SessionListFilter) ([]SessionSnapshot, error)
- func (r *Registry) Open(ctx context.Context, id string, ident identity.Identity) (*Session, error)
- func (r *Registry) Touch(ctx context.Context, id string) error
- type RunningProbe
- type Session
- type SessionClosedPayload
- type SessionGCReapedPayload
- type SessionLimits
- type SessionListFilter
- type SessionLister
- type SessionOpenedPayload
- type SessionRegistry
- type SessionSnapshot
- type SessionTouchedPayload
Constants ¶
const ( EventTypeSessionOpened events.EventType = "session.opened" EventTypeSessionTouched events.EventType = "session.touched" EventTypeSessionClosed events.EventType = "session.closed" EventTypeSessionGCReaped events.EventType = "session.gc_reaped" )
Session lifecycle event types. Each is registered with the events package's exhaustive registry via init() so Publish accepts them without ErrUnknownEventType. Subscribers can filter on these via events.Filter.Types.
Variables ¶
var ( // ErrReopenAfterClose — Open called for a SessionID whose existing // record is Closed. Per RFC §6.9 ("Reopen-after-close is forbidden"). ErrReopenAfterClose = errors.New("sessions: reopen-after-close forbidden") // ErrSessionIDReuse — Open called with a SessionID already opened // under a different (tenant, user). Per RFC §6.9 ("reusing a session // ID across tenants/users is rejected"). ErrSessionIDReuse = errors.New("sessions: SessionID reused across tenants/users") // ErrIdentityMismatch — Touch / Close called with a ctx Identity // that disagrees with the stored session's Identity. The triple is // captured immutably on Open; mid-flight identity swaps are bugs. ErrIdentityMismatch = errors.New("sessions: ctx identity mismatches stored session identity") // ErrSessionNotFound — Get / Touch / Close / Inspect targeting a // SessionID that has no record (or the record was Deleted). ErrSessionNotFound = errors.New("sessions: session not found") // ErrSessionAlreadyOpen — Open called twice with the same triple // AND SessionID without an intervening Close. Distinct from // ErrReopenAfterClose (which fires when Closed is true). ErrSessionAlreadyOpen = errors.New("sessions: session already open") // ErrRegistryClosed — any operation called after CloseRegistry. ErrRegistryClosed = errors.New("sessions: registry is closed") )
Sentinel errors. Callers compare via errors.Is.
Functions ¶
This section is empty.
Types ¶
type Clock ¶
Clock abstracts time so GC tests are deterministic without time.Sleep. Production code uses realClock; tests pass a fakeClock.
The interface intentionally returns time.Time directly (not a monotonic count) so GC's wall-clock math is identical between production and test paths.
type GCPolicy ¶
type GCPolicy struct {
IdleTTL time.Duration
HardCap time.Duration
SweepInterval time.Duration
RunningProbe RunningProbe
}
GCPolicy bundles the GC sweeper's tunables. Defaults match RFC §6.9: IdleTTL 24h, HardCap 720h (30 days), SweepInterval 15m. The RunningProbe should be TaskRunningProbe wherever a TaskRegistry is in scope; default is the no-op.
type Option ¶
type Option func(*Registry)
Option configures a Registry at construction. Only WithClock is exported; production code does not touch it. Tests use a fake clock to drive GC deterministically.
func WithClock ¶
WithClock injects a custom Clock. Production code uses realClock; the test suite uses a controllable clock to exercise hard-cap GC without time.Sleep (per AGENTS.md §11).
func WithGCPolicy ¶
WithGCPolicy injects an explicit GCPolicy at construction. When omitted, the GCPolicy is built from the SessionsConfig defaults + the no-op RunningProbe. Production wiring (cmd/harbor and harbortest/devstack) passes a policy whose RunningProbe is TaskRunningProbe so GC honors RFC §6.9.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the StateStore-backed implementation of SessionRegistry. It is the single concrete impl in V1 — driver pluralism lives at the StateStore layer.
Concurrency model:
- StateStore writes go through `state.StateStore.Save` which is itself concurrent-safe per D-025.
- The cross-tenant SessionID-uniqueness map is guarded by mu.
- The sweeper goroutine's lifecycle is owned by `done` + `wg`.
func New ¶
func New(store state.StateStore, cfg config.SessionsConfig, bus events.EventBus, opts ...Option) (*Registry, error)
New constructs a Registry. The store and bus are required; cfg supplies the GC tunables (defaults applied if zero); opts can override the Clock or GCPolicy.
The sweeper goroutine starts immediately and runs at gcPolicy.SweepInterval until CloseRegistry is called. The probe defaults to no-op; assemblies that own a TaskRegistry MUST pass TaskRunningProbe via WithGCPolicy (both reference assemblies do) so GC never reaps a session with a RUNNING task.
func (*Registry) Close ¶
Close marks the session Closed and emits session.closed. Idempotent: closing an already-closed session is a no-op AND preserves the original ClosedReason.
func (*Registry) CloseRegistry ¶
CloseRegistry cancels the sweeper goroutine and joins it. Idempotent. Subsequent operations return ErrRegistryClosed.
func (*Registry) EnsureOpen ¶ added in v1.2.0
EnsureOpen is the create-on-first-use entry point (D-171). It returns the live session for `ident`, creating it if no record exists. The session id is `ident.SessionID` (the per-request session the client chose); the (tenant, user) come from the verified connection token.
Semantics:
- No record yet → Open it (create) and return the fresh session.
- Open record at this exact triple → no-op, return the existing session (a second turn in the same conversation is not an error).
- Closed record at this triple → ErrReopenAfterClose (RFC §6.9: a GC-reaped or operator-closed session is read-only; a NEW conversation must pick a NEW session id). The caller maps this to a clear client error; it is never a silent revive.
EnsureOpen is identity-mandatory: an incomplete triple fails closed. It is the seam the Protocol ControlSurface calls on `start` so the first turn of a brand-new conversation materialises the session row the Console's sessions.list reads.
func (*Registry) GC ¶
GC performs a single sweep pass. For each currently-open session:
- if RunningProbe returns true: skip (per RFC §6.9 "GC never reaps a session with a RUNNING task").
- else if LastSeen + IdleTTL < clock.Now(): close with reason "gc:idle".
- else if OpenedAt + HardCap < clock.Now(): close with reason "gc:hard_cap" (the hard cap wins over recent Touch).
Returns the number of sessions reaped and the first probe / close error encountered (the sweep continues past errors so a single bad session doesn't block the rest).
func (*Registry) Get ¶
Get loads the session with `id` for the identity in ctx. Returns ErrSessionNotFound when the record is absent. Identity-mandatory: the ctx Identity is used to scope the StateStore read; cross-tenant access is prevented because the StateStore key contains the full triple.
func (*Registry) Inspect ¶
Inspect returns a SessionSnapshot. Running is derived from the configured RunningProbe at inspection time.
func (*Registry) ListSnapshots ¶
func (r *Registry) ListSnapshots(ctx context.Context, f SessionListFilter) ([]SessionSnapshot, error)
ListSnapshots implements sessions.SessionLister — the Phase 72c `search.sessions` read-side projection. Returns snapshots for every session the registry has seen (open OR closed) matching the filter.
The registry's in-memory `idIndex` is the catalog of every SessionID that has been Opened during this registry's lifetime. The snapshot is built by Loading each matching session from the StateStore so the Closed / ClosedAt / LastSeen fields are current; Running is derived from the GCPolicy RunningProbe at inspection time (mirroring Inspect's contract).
The caller (the search subsystem) is responsible for the auth scope gate — ListSnapshots does NOT re-check scope. It DOES validate that every supplied `TenantIDs` / `UserIDs` / `SessionIDs` entry is non-empty (a no-op for empty filters).
Concurrent reuse (D-025): ListSnapshots only reads `idIndex` / `openSessions` under the registry's mutex; no per-call state lives on `*Registry`. One Registry serves N concurrent ListSnapshots safely.
func (*Registry) Open ¶
Open creates a new session record, captures the identity triple immutably, and emits session.opened. Cross-tenant SessionID reuse and reopen-after-close are rejected; same-triple double-open is rejected with ErrSessionAlreadyOpen.
type RunningProbe ¶
RunningProbe is the seam the GC sweeper consults so it can honor "never reap a session with a RUNNING task" (RFC §6.9). The TaskRegistry-backed implementation is TaskRunningProbe; both reference assemblies wire it via WithGCPolicy. A nil probe is treated as the no-op default (returns false, nil) — only for registries constructed without task awareness.
func TaskRunningProbe ¶ added in v1.3.0
func TaskRunningProbe(reg tasks.TaskRegistry) RunningProbe
TaskRunningProbe adapts a tasks.TaskRegistry into the RunningProbe seam so the GC sweeper can enforce RFC §6.9's "GC never reaps a session with a RUNNING task" invariant. The probe lists the session's tasks filtered to tasks.StatusRunning (the exact status the RFC names — PENDING / PAUSED tasks do not block GC) and reports whether any exist.
Wiring: both reference assemblies (cmd/harbor `bootDevStack` and harbortest/devstack `Assemble`) pass this probe via sessions.New(..., WithGCPolicy(GCPolicy{RunningProbe: ...})). A registry constructed without it falls back to the no-op default, which reports no running tasks — sessions with in-flight work become reapable, so assemblies that own a TaskRegistry MUST wire this adapter.
The import direction is sound by construction: internal/tasks does not import internal/sessions (verified at the time of authoring), so the adapter lives here, next to the RunningProbe type it satisfies.
type Session ¶
type Session struct {
ID string
Identity identity.Identity
OpenedAt time.Time
LastSeen time.Time
Closed bool
ClosedAt time.Time
ClosedReason string
Limits SessionLimits
Context map[string]any
}
Session is the persisted lifecycle record for one session. The Identity field carries the triple captured on Open and is immutable afterwards. Closed transitions to true on Close; ClosedAt stays zero while Closed is false.
Limits and Context are reserved slots — Phase 36a/b will populate Limits with the cost / token ceilings; Phase 23+ will populate Context with the (version, hash, llm/tool ctx, memory, artifacts) quintuple sketched in RFC §6.9. Phase 08 round-trips both fields through marshal/unmarshal but applies no validation.
type SessionClosedPayload ¶
type SessionClosedPayload struct {
events.SafeSealed
SessionID string
ClosedAt int64 // unix nanoseconds
Reason string
}
SessionClosedPayload reports a Close. Carries the SessionID, the ClosedAt timestamp, and the operator-provided Reason. Reason is a short caller-controlled string — callers MUST NOT pass tool args, raw user input, or any secret-shaped material; the bus does not re-redact SafePayload types (D-020 / D-028).
type SessionGCReapedPayload ¶
type SessionGCReapedPayload struct {
events.SafeSealed
SessionID string
ReapedAt int64 // unix nanoseconds
Reason string
}
SessionGCReapedPayload reports a GC sweep reaping. Reason is one of "gc:idle" or "gc:hard_cap"; the same string is also stored in Session.ClosedReason. SafePayload by construction.
type SessionLimits ¶
type SessionLimits struct {
}
SessionLimits is reserved for Phase 36a (cost ceilings) and Phase 26 (tool catalog). Empty in Phase 08; round-trips through marshal.
type SessionListFilter ¶
type SessionListFilter struct {
TenantIDs []string
UserIDs []string
SessionIDs []string
SinceLastSeen time.Time
UntilLastSeen time.Time
IncludeClosed bool
}
SessionListFilter narrows ListSnapshots's result set. All fields are wildcards when empty. SinceLastSeen / UntilLastSeen filter by the LastSeen timestamp; zero means "no bound."
type SessionLister ¶
type SessionLister interface {
// ListSnapshots returns session snapshots that match the filter,
// scoped to the requested tenants. Empty `TenantIDs` matches every
// tenant the registry has seen (the search subsystem gates this
// on the caller's auth.ScopeAdmin claim — the registry does NOT
// re-check scope). Empty `UserIDs` / `SessionIDs` are wildcards.
ListSnapshots(ctx context.Context, f SessionListFilter) ([]SessionSnapshot, error)
}
SessionLister is the narrow read-side capability the Phase 72c `search.sessions` Searcher consumes. The triple `(tenant, user, session)` is the load-bearing isolation key (CLAUDE.md §6); the listing is server-enforced per the supplied SessionListFilter. The returned snapshots include both currently-open and previously-closed sessions — search wants the union.
Intentionally NOT on the SessionRegistry interface: the lister is a projection over the in-memory open-session index plus the StateStore. Concrete `*Registry` implements it; future drivers add it when their backing store gains a `list` capability (StateStore List is post-V1).
type SessionOpenedPayload ¶
type SessionOpenedPayload struct {
events.SafeSealed
SessionID string
OpenedAt int64 // unix nanoseconds; identity-of-record across drivers
}
SessionOpenedPayload reports a successful Open. Carries the SessionID and the OpenedAt timestamp; the identity triple lives on the Event itself, so it is intentionally NOT duplicated here.
SafePayload by construction — no secret-shaped fields.
type SessionRegistry ¶
type SessionRegistry interface {
Open(ctx context.Context, id string, ident identity.Identity) (*Session, error)
// EnsureOpen is the create-on-first-use entry point (D-171): it
// returns the live session for ident, creating it if absent and
// no-opping if already open. A closed session is NOT revived
// (ErrReopenAfterClose). See the concrete impl for full semantics.
EnsureOpen(ctx context.Context, ident identity.Identity) (*Session, error)
Get(ctx context.Context, id string) (*Session, error)
Touch(ctx context.Context, id string) error
Close(ctx context.Context, id string, reason string) error
Inspect(ctx context.Context, id string) (*SessionSnapshot, error)
GC(ctx context.Context, policy GCPolicy) (int, error)
// CloseRegistry cancels the sweeper goroutine and joins it.
// Idempotent. Distinct method name (rather than Close) so it
// doesn't collide with Close(id, reason).
CloseRegistry(ctx context.Context) error
}
SessionRegistry is the public surface every consumer (Phase 09 envelope writer, Phase 60 Protocol surface, Phase 72-75 Console subscribers, etc.) talks to. One concrete impl ships in Phase 08 (`*Registry`); driver pluralism lives at the StateStore layer.
type SessionSnapshot ¶
SessionSnapshot is the read-side projection returned by Inspect. Carries the lifecycle fields plus a Running boolean derived from the GCPolicy.RunningProbe at inspection time. Running is intrinsically stale by the time the caller reads it; the same is true of any snapshot model.
type SessionTouchedPayload ¶
type SessionTouchedPayload struct {
events.SafeSealed
SessionID string
LastSeen int64 // unix nanoseconds
}
SessionTouchedPayload reports a Touch. Carries the SessionID and the new LastSeen timestamp. SafePayload by construction.