swarm

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package swarm holds the per-slot session record schema and the JetStream-KV-backed Manager that bones swarm verbs use to track active swarm sessions in a workspace.

State lives in a single KV bucket (DefaultBucketName) keyed by slot name. ADR 0028 §"Lifecycle and state" explains why session state went to KV instead of a per-slot state.json on disk: every field in this struct is either already authoritative somewhere else in JetStream KV (task_id in bones-tasks, agent_id in bones-presence, the claim hold in bones-holds) or describes the host-local OS process owning the leaf (host, leaf_pid). Putting them in one bucket gives cross-host visibility and a single source of truth for `bones swarm status` and `bones doctor` without re-deriving state from N substrate buckets per call.

The Manager is intentionally simpler than internal/presence: there is no background heartbeat goroutine. Heartbeats are driven by agent-process calls to `bones swarm commit`, which extends TTL via CAS. A slot whose agent has crashed stops heartbeating and the bucket TTL evicts the entry — exactly the surface `bones doctor` is meant to see.

Index

Constants

View Source
const DefaultBucketName = "bones-swarm-sessions"

DefaultBucketName is the JetStream KV bucket holding swarm sessions. The name is part of the substrate contract (parallel to bones-tasks, bones-holds, bones-presence) and SHOULD NOT change without a substrate-evolution ADR.

View Source
const DefaultTTL = 5 * time.Minute

DefaultTTL is the JetStream KV bucket TTL. Sessions are heartbeated via `bones swarm commit`; a slot whose agent has crashed stops renewing and the bucket evicts the entry after this interval. Five minutes mirrors ADR 0028's "stale after 5min no renewal" rule.

Variables

View Source
var ErrCASConflict = errors.New("swarm: CAS conflict")

ErrCASConflict reports that a CAS-gated Update or Delete saw a revision mismatch — another writer raced ours. Mirrors internal/tasks.ErrCASConflict so callers can react identically.

View Source
var ErrClosed = errors.New("swarm: manager is closed")

ErrClosed reports that a public method was called on a Manager whose Close has returned. Parallel to internal/presence.ErrClosed.

View Source
var ErrNotFound = errors.New("swarm: session not found")

ErrNotFound reports that the requested slot has no live session record in the bucket. Distinct from a substrate error so callers (swarm status, swarm commit) can distinguish "no session yet" from "NATS unreachable."

Functions

func SlotDir

func SlotDir(workspaceDir, slot string) string

SlotDir returns the on-disk directory for the slot's per-leaf state under the workspace root. Layout:

<workspace>/.bones/swarm/<slot>/
├── leaf.fossil          libfossil repo (cloned from hub)
├── leaf.pid             host-local PID tracker
└── wt/                  worktree (Leaf.WT() result)

Pure path derivation; no KV lookup. Callers can compute this without opening a Manager — `bones swarm cwd` exploits exactly that to avoid a NATS round-trip for a path query.

func SlotPidFile

func SlotPidFile(workspaceDir, slot string) string

SlotPidFile returns the slot's host-local PID-tracker path. Used by swarm join to write and swarm close to remove.

func SlotWorktree

func SlotWorktree(workspaceDir, slot string) string

SlotWorktree returns the slot's worktree path. Equivalent to filepath.Join(SlotDir(workspaceDir, slot), "wt"). Pulled out as a distinct helper because `bones swarm cwd` always wants this — the repo file and pid file are not user-facing.

Types

type Config

type Config struct {
	// NATSConn is the live NATS connection. Manager does not dial; the
	// caller owns connect/disconnect lifecycle. Required.
	NATSConn *nats.Conn

	// BucketName overrides DefaultBucketName. Empty string falls back
	// to the package default.
	BucketName string

	// TTL overrides DefaultTTL on bucket creation. Zero falls back to
	// the package default. Ignored if the bucket already exists with
	// a different TTL — JetStream KV is not destructive on Update.
	TTL time.Duration
}

Config configures a Manager. Only NATSConn is required; bucket name and TTL fall back to the package defaults if zero. Defaults match production; tests sometimes pass shorter TTLs to exercise expiry.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error describing the first invalid field, or nil if the config is acceptable. Manager.Open calls Validate before any substrate work so callers see config issues with no NATS round-trip.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the JetStream KV bucket holding swarm session records. Every public method is safe to call concurrently. Close is idempotent. Unlike presence.Manager, there is no heartbeat goroutine — callers (the bones swarm verbs) drive renewal via Update.

func Open

func Open(ctx context.Context, cfg Config) (*Manager, error)

Open creates (or reattaches to) the swarm sessions KV bucket and returns a Manager. Caller must Close at shutdown. Open does not dial NATS: the connection comes pre-wired so reconnect policy stays a single-source concern (same shape as presence.Open).

func (*Manager) BucketName

func (m *Manager) BucketName() string

BucketName returns the bucket name in use. Useful for diagnostics and for `bones doctor` to mention the right bucket in its messages.

func (*Manager) Close

func (m *Manager) Close() error

Close marks the Manager closed so subsequent public calls return ErrClosed. Safe to call more than once. Does not delete bucket contents; sessions outlive the process that wrote them (TTL eventually evicts).

func (*Manager) Delete

func (m *Manager) Delete(
	ctx context.Context, slot string, expectedRev uint64,
) error

Delete removes the session record at slot via a CAS gate against expectedRev. ErrCASConflict on revision mismatch — callers should re-Get and retry only if they still want to delete the (newer) record. ErrNotFound if the key was already missing.

Delete is the close path: `bones swarm close` removes the session after posting the dispatch result and stopping the leaf.

func (*Manager) Get

func (m *Manager) Get(
	ctx context.Context, slot string,
) (Session, uint64, error)

Get reads the session record for slot. Returns ErrNotFound if no record exists. The returned revision is the JetStream KV sequence number suitable for a follow-up CAS Update or Delete.

func (*Manager) List

func (m *Manager) List(ctx context.Context) ([]Session, error)

List returns every live session in the bucket. Order matches the underlying JetStream KV list order (key-name lexicographic in practice; callers that need a specific order should sort).

func (*Manager) Put

func (m *Manager) Put(ctx context.Context, sess Session) error

Put writes the session record for sess.Slot as a fresh entry. Fails (with ErrCASConflict) if a record already exists at that slot — callers must Delete first to take over an abandoned slot. Mirrors the create-or-update split in internal/tasks.

func (*Manager) Update

func (m *Manager) Update(
	ctx context.Context, sess Session, expectedRev uint64,
) error

Update overwrites the session record for sess.Slot using a CAS gate against expectedRev. Returns ErrCASConflict if the bucket's current revision differs from expectedRev — another writer raced ours and the caller should re-Get and decide whether to retry.

Update is the heartbeat path: `bones swarm commit` reads the current session, updates LastRenewed, and CAS-writes back.

type Session

type Session struct {
	// Slot is the slot name (matches the plan's `[slot: X]` annotation).
	// Doubles as the KV bucket key — slot is the primary identifier.
	Slot string `json:"slot"`

	// TaskID identifies the task this slot is currently working on.
	// Lookup against bones-tasks for full task state; this field is a
	// denormalized pointer used by `bones swarm status` and the close
	// verb so they don't need a second lookup.
	TaskID string `json:"task_id"`

	// AgentID is "slot-<name>" — the slot's fossil + coord identity.
	// Matches the agent_id used for the holds and presence buckets so
	// claim ownership and liveness join cleanly across substrates.
	AgentID string `json:"agent_id"`

	// Host is os.Hostname() of the machine running the leaf. Different
	// from the workspace host means another machine owns this slot;
	// `bones swarm` verbs abort with a clear cross-host error rather
	// than try to manage a remote leaf process.
	Host string `json:"host"`

	// LeafPID is the host-local PID of the bones CLI process that
	// wrote this record at join time. Informational only — every
	// swarm verb is its own short-lived CLI invocation, so this PID
	// is already-dead by the time any later verb reads it. Kept as a
	// debugging crumb (a pid that was alive at join means the join
	// did happen on this host) but NOT a liveness signal; use
	// LastRenewed for active/stale classification instead.
	LeafPID int `json:"leaf_pid"`

	// HubURL is the hub fossil HTTP URL recorded at join time. Later
	// verbs read this so they can post-commit push the slot's
	// leaf.fossil to the hub via /xfer without re-deriving the URL
	// from a flag. Empty on records written by older clients;
	// callers fall back to the join-flag default.
	HubURL string `json:"hub_url,omitempty"`

	// StartedAt is when this session record was first written.
	// Immutable once set; later renewals only touch LastRenewed.
	StartedAt time.Time `json:"started_at"`

	// LastRenewed is updated on every `swarm commit` (which heartbeats
	// the session) and used by `bones doctor` to flag stale entries.
	// This is the canonical active/stale signal — see LeafPID for the
	// "why we don't use the pid" discussion.
	LastRenewed time.Time `json:"last_renewed"`
}

Session is the per-slot record persisted in the bones-swarm-sessions JetStream KV bucket. One record per active slot in a workspace. Slot names are workspace-scoped because the bucket itself is per- NATS-deployment.

Marshaled as JSON for human-readable debugging via `nats kv get`. All time fields are RFC3339-encoded UTC. New fields must default safely on a missing-key read so older clients can read newer records (additive evolution only).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL