presence

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package presence is the substrate layer that backs coord's Who and WatchPresence. A single JetStream KV bucket carries one entry per live agent, refreshed on Config.HeartbeatInterval cadence. Entry TTL is 3x HeartbeatInterval (invariant 19) — tightening the multiplier requires an ADR amendment.

This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it. The internal Entry and Event types translate through coord/types.go and coord/events.go into the public Presence DTO and PresenceChange event per ADR 0003's substrate-hiding rule.

Index

Constants

View Source
const TTLMultiplier = 3

TTLMultiplier is the fixed multiplier that derives the KV bucket TTL from Config.HeartbeatInterval (invariant 19). Three heartbeat intervals give two missed-heartbeat intervals of slack before an entry expires, which is the published convention for similar liveness systems. Changing this multiplier requires an ADR amendment.

Variables

View Source
var ErrClosed = errors.New("presence: manager is closed")

ErrClosed reports that a public method was called on a Manager whose Close has returned. Parallel to internal/chat.ErrClosed, internal/tasks.ErrClosed, and internal/holds.ErrClosed so every substrate manager surfaces the same close-race sentinel.

Functions

This section is empty.

Types

type Config

type Config struct {
	// AgentID identifies this coord instance across the substrate. It
	// is threaded into the presence Entry's AgentID field and used to
	// derive the KV key this Manager refreshes on heartbeat.
	AgentID string

	// Project is the <proj> segment used to scope presence queries.
	// Matches the project-prefix scheme Post/Ask use for NATS subjects
	// (ADR 0008). Presence is project-scoped: agents in project A
	// cannot see agents in project B.
	Project string

	// Bucket is the name of the JetStream KV bucket backing presence.
	// Coord supplies bones-presence; validated here so a
	// misconfiguration (empty) fails at Open rather than at first Put.
	Bucket string

	// NATSConn is the pre-connected NATS handle from coord. The
	// presence manager does not dial its own connection — it shares
	// the one coord opened.
	NATSConn *nats.Conn

	// HeartbeatInterval is the cadence at which the heartbeat goroutine
	// refreshes this agent's KV entry. Bucket TTL is 3x this value
	// (invariant 19); the multiplier is fixed in code, not
	// configurable.
	HeartbeatInterval time.Duration

	// ChanBuffer sets the channel buffer for Watch. If left zero, Open
	// substitutes defaultChanBuffer.
	ChanBuffer int
}

Config configures Open. Every field is required; there are no silent defaults. The operator supplies the numbers — coord.Open is the enforcement point for coord.Config.Validate and propagates its own validated inputs into this struct.

func (Config) Validate

func (c Config) Validate() error

Validate checks every Config field against its documented bounds and returns the first violation as an error. The error message follows the shape "presence.Config: <field>: <reason>". Validate is pure; it does not panic on bad operator input per invariant 9 — panics are reserved for programmer-error invariants inside Open and the method wrappers.

type Entry

type Entry struct {
	// AgentID identifies the agent the entry describes. It must be
	// non-empty per invariant 3 and matches Config.AgentID on the
	// owning Manager.
	AgentID string `json:"agent_id"`

	// Project scopes the entry. A Who call from project A filters out
	// entries whose Project is not A. Matches the coord.Config.AgentID
	// project-prefix scheme.
	Project string `json:"project"`

	// StartedAt is the UTC wall-clock time the owning Manager was
	// Opened. Immutable across heartbeats — a consumer watching
	// presence can tell "same agent, still up" from "agent restarted"
	// by comparing StartedAt across two reads.
	StartedAt time.Time `json:"started_at"`

	// LastSeen is refreshed on every heartbeat. Consumers computing
	// "is this entry fresh?" compare LastSeen against wall-clock now
	// plus the heartbeat cadence; TTL-based expiry also kicks the
	// entry out of the bucket, which is the final source of truth.
	LastSeen time.Time `json:"last_seen"`
}

Entry is the value stored at each agent key in the KV bucket. The struct is persisted as JSON; every timestamp is wall-clock UTC so that two processes reading the same entry reach consistent verdicts.

type Event

type Event struct {
	AgentID   string
	Project   string
	Kind      EventKind
	Timestamp time.Time
}

Event is delivered to Watch callers on each observed presence change. AgentID and Project identify the subject; Kind is the change shape; Timestamp is the wall-clock moment the watcher observed the change.

type EventKind

type EventKind int

EventKind identifies the shape of a presence change delivered by Watch.

const (
	// EventUp is delivered when a new entry appears in the bucket — a
	// fresh Put where the previous state was vacant, deleted, or
	// expired.
	EventUp EventKind = iota + 1

	// EventDown is delivered when an entry is removed from the bucket
	// — either an explicit Delete (clean shutdown) or a KV TTL
	// expiry (missed heartbeats).
	EventDown
)

func (EventKind) String

func (k EventKind) String() string

String returns a human-readable name for the EventKind.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns a JetStream KV bucket entry tracking this agent's liveness plus a heartbeat goroutine refreshing it on Config.HeartbeatInterval cadence. Every public method is safe to call concurrently. Close is idempotent.

The heartbeat goroutine is the distinguishing feature versus the other substrate managers (holds/tasks/chat): Open spawns it, Close joins it. Invariant 18 requires Close to return only after the goroutine has terminated.

func Open

func Open(ctx context.Context, cfg Config) (*Manager, error)

Open creates (or reattaches to) the presence KV bucket, writes this agent's initial entry, and starts the heartbeat goroutine. Constructing a Manager consumes one goroutine for the heartbeat loop plus any goroutines Watch callers spawn. Callers must invoke Close to release resources and stop the heartbeat.

Open does not dial NATS: the connection comes pre-wired from coord so reconnection policy stays a single-source concern. If bucket creation, initial Put, or heartbeat spawn fails, earlier steps are torn down before returning so no resources leak.

func (*Manager) Close

func (m *Manager) Close() error

Close stops the heartbeat goroutine, deletes this agent's entry from the KV bucket (so peers see "offline" immediately rather than waiting for TTL expiry), and marks the Manager as closed so subsequent public calls return ErrClosed. Safe to call more than once; subsequent calls are no-ops and return nil.

Close blocks until the heartbeat goroutine has returned (invariant 18). The KV delete uses a bounded ctx so a substrate blip at shutdown does not hang Close indefinitely — on failure the entry falls back to TTL-based cleanup and the error is swallowed (same shape as holds.Release after Coord.Close).

func (*Manager) Present

func (m *Manager) Present(
	ctx context.Context, agentID string,
) (bool, error)

Present reports whether agentID currently has a live presence entry in this Manager's project. A live entry is a KV Put that has not been deleted and has not yet aged past its TTL. Returns false (with nil error) for missing, tombstoned, or expired entries — the caller cannot distinguish those cases, and the presence substrate treats them identically (all three mean "not reachable").

Cheaper than Who for a single-recipient check: one Get, no list or scan. The ergonomic wrapper for admin-Ask-style pre-flights.

Returns ErrClosed after Close. Substrate errors are wrapped with the presence.Present prefix.

func (*Manager) Watch

func (m *Manager) Watch(
	ctx context.Context,
) (<-chan Event, error)

Watch returns a channel of presence Events scoped to this Manager's project. Up events fire on fresh Puts; Down events fire on Deletes and KV TTL expiries. The channel closes when ctx is canceled.

The initial snapshot is skipped: Watch reports deltas from the moment of subscription, not the set of already-present agents. Use Who for a snapshot; use Watch for changes. Callers that want both wire them together.

Returns ErrClosed after Close. Substrate errors are wrapped with the presence.Watch prefix. A nil Manager or nil ctx panics.

func (*Manager) Who

func (m *Manager) Who(ctx context.Context) ([]Entry, error)

Who returns every live agent in this Manager's project. A fresh scan; presence state is read-through the KV. Entries whose Project does not match Config.Project are filtered out at the client because the bucket is shared across projects (one bucket per project would require a dynamic bucket name and fragment operator-level observability).

Returns ErrClosed after Close. Substrate errors are wrapped with the presence.Who prefix.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL