tasks

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package tasks is the substrate layer that stores task records in a NATS JetStream KV bucket. It exposes CAS-atomic primitives — Create, Get, Update, List, Watch — consumed exclusively by the coord package. See docs/adr/0005-tasks-in-nats-kv.md for the schema, bucket, and retention rules this package enforces.

This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it.

Index

Constants

View Source
const AllEventsSubject = "tasks.events.>"

AllEventsSubject is the wildcard subject that matches every task event on the stream.

View Source
const DefaultBucketName = "bones-tasks"

DefaultBucketName is the canonical JetStream KV bucket name for task records, pinned by ADR 0005. Both the CLI's openManager helper and internal/coord must point at this same bucket; otherwise create-then- link/prime/autoclaim cross-store and silently miss the just-created task. Tests may override via Config.BucketName for isolation.

View Source
const EventRetentionWindow = 90 * 24 * time.Hour

EventRetentionWindow is the time-based fallback for compaction. Events older than this for a task that also has at least one newer event are eligible for compaction. ADR 0052 fixes this at 90 days.

View Source
const EventStreamName = "tasks-events"

EventStreamName is the JetStream stream name backing the task event log. Callers in internal/hub provision the stream with this name and the AllEventsSubject filter.

View Source
const PerTaskEventCap = 50

PerTaskEventCap is the maximum number of events kept per task before older events are compacted into a synthetic created summary. ADR 0052 fixes this at 50.

View Source
const RecentActivityCount = 20

RecentActivityCount is the default number of events `bones status` surfaces under "Recent activity" per ADR 0052. Tunable as a constant rather than a flag — operators wanting more reach for `bones tasks watch --since=<duration>`.

View Source
const SchemaVersion = 4

SchemaVersion is the currently-written task record schema. Every Create stamps this on the record so future migrations can fan out on observed version. v3 adds closed-task compaction metadata. v4 adds LastEventSeq, the projection-watermark wired by the event log per ADR 0052.

Variables

View Source
var ErrAlreadyExists = errors.New("tasks: record already exists")

ErrAlreadyExists reports that Create was called for a TaskID that already has a record in the bucket. Collisions are programmer errors under the ADR 0005 ID generator, but the sentinel exists so callers can distinguish a duplicate Create from an unrelated substrate error.

View Source
var ErrCASConflict = errors.New(
	"tasks: CAS conflict exceeded retries",
)

ErrCASConflict reports that Update exhausted jskv.MaxRetries without converging. Under normal contention this should never surface; its presence in a call site's error handling is the explicit surrender boundary for the CAS retry loop.

View Source
var ErrClosed = errors.New("tasks: manager is closed")

ErrClosed reports that a public method was called on a Manager whose Close has returned. Close-race with an in-flight call surfaces this error rather than a data race or nil dereference.

View Source
var ErrEventLogDisabled = errors.New(
	"tasks: event log disabled on this manager",
)

ErrEventLogDisabled reports that a Tx mutation was attempted on a Manager opened without an event log. coord's archive Manager opens in this mode for compaction-only writes; production Tx callers always have the event log enabled.

View Source
var ErrInvalidStatus = errors.New("tasks: invalid status value")

ErrInvalidStatus reports that a Task's Status field was outside the fixed {open, claimed, closed} enum. Any mutation that would write an unknown status returns this error before the CAS call.

View Source
var ErrInvalidTransition = errors.New(
	"tasks: invalid status transition",
)

ErrInvalidTransition reports that a mutation attempted a status edge outside the ADR 0005 DAG as amended by ADR 0007: legal edges are open→claimed, open→closed, claimed→closed, and claimed→open (the release-side un-claim edge). Any other backwards edge from closed or self-loop on closed is rejected.

View Source
var ErrInvariant11 = errors.New(
	"tasks: claimed_by/status mismatch (invariant 11)",
)

ErrInvariant11 reports that a mutation violated invariant 11: the claimed_by field must be non-empty iff status == claimed. Rejected before the CAS call so bucket state never observes the inconsistent pair.

View Source
var ErrNoMutations = errors.New(
	"tasks: Tx callback made no mutations",
)

ErrNoMutations reports that a Tx callback returned without invoking any tx.X method. Tx is the only mutation entry point on Manager and returning without one is a programmer error caught at the API boundary rather than allowed to silently no-op.

View Source
var ErrNotFound = errors.New("tasks: record not found")

ErrNotFound reports that Get or Update was called for a TaskID absent from the bucket. Coord translates this into its own public sentinel when composing higher-level verbs.

View Source
var ErrValueTooLarge = errors.New(
	"tasks: encoded value exceeds max size",
)

ErrValueTooLarge reports that a Task, after JSON encoding, exceeded Config.MaxValueSize. Enforced at every write per invariant 14.

Functions

func CompactPerTask added in v0.13.0

func CompactPerTask(ctx context.Context, m *Manager, taskID string) (int, error)

CompactPerTask replaces the older events for one task with a single synthetic created event carrying the full post-compaction snapshot. Returns the number of events removed (i.e., per-task event count before minus after).

CompactPerTask is the per-task primitive used by the hub-side compaction worker. The worker is a follow-up; this primitive lets tests exercise the compaction semantics without scheduling.

Safety: compaction runs only on tasks whose latest projection exists in KV. Aborts (no-op) if the projection is missing — the task may have been purged.

func DecodePayload added in v0.13.0

func DecodePayload(env EventEnvelope) (any, error)

DecodePayload unmarshals env.Payload into a typed payload struct matching env.Type. The returned value is one of the *Payload types defined in this file.

func EncodeForTest

func EncodeForTest(t Task) ([]byte, error)

EncodeForTest exposes the package's internal task encoder so sibling test packages can produce wire-compatible bytes to Put directly into the KV bucket. Not part of the supported public API.

func EventSubject added in v0.13.0

func EventSubject(taskID string) string

EventSubject returns the JetStream subject for events scoped to one task ID. Subjects under tasks.events.> partition by task so a Watch can replay one task's history with a subject filter.

func MarshalEnvelope added in v0.13.0

func MarshalEnvelope(env EventEnvelope) ([]byte, error)

MarshalEnvelope serializes env to the bytes published on the stream. JSON for parity with internal/chat.Envelope per ADR 0047.

func Migrate added in v0.13.0

func Migrate(ctx context.Context, m *Manager) (int, error)

Migrate emits synthetic events for every pre-existing KV record so the event log carries a baseline for recovery and audit. Idempotent via migrationMarkerKey; running Migrate twice is a no-op.

This is the one-time path for upgrading a workspace from before the event log was the source of truth (schema v3 or earlier). Documented limit per ADR 0052: pre-migration update history is lost — only created / claimed / closed bookmarks are reconstructible.

func PriorityRank added in v0.11.0

func PriorityRank(t Task) (int, bool)

PriorityRank returns the numeric rank of t's priority and whether the priority parsed cleanly. The convention matches beads: Context["priority"] holds a "P<N>" string (e.g. "P0", "P1", "P2") where lower N == higher priority. Unprioritized records report (0, false) — callers use the second return to decide ordering.

func Recover added in v0.13.0

func Recover(ctx context.Context, m *Manager) (int, error)

Recover reconciles the KV projection with the event log on hub start. For each unique task ID with at least one event on the stream, the recovery loop walks events newer than the KV's LastEventSeq and applies them via AdminWrite (a no-op when the projection is already up to date). Returns the number of orphan events replayed.

Recovery is idempotent: repeated calls on a stable substrate replay no events. Concurrent Recover calls race only on the underlying KV CAS; the substrate serializes them.

func SetCASRetryHookForTest

func SetCASRetryHookForTest(fn func()) (restore func())

SetCASRetryHookForTest installs fn as the per-retry hook used by Update's CAS loop, returning a restore function the test must call (typically via t.Cleanup) to reinstate the no-op default. Tests use this to observe the number of CAS retries without instrumenting the KV transport. The hook fires exactly once per retry — never on the first attempt or after a final verdict.

func SetUpdatePreWriteHookForTest

func SetUpdatePreWriteHookForTest(fn func()) (restore func())

SetUpdatePreWriteHookForTest installs fn as the per-attempt hook used by Update's CAS loop between the Get and the Update calls. Tests use this seam to deterministically force CAS conflicts by performing a direct Put while fn runs — every attempt reliably fails under such a hook, making the retry-exhaustion path reachable in a single test. Restore must be called (typically via t.Cleanup) to reinstate the no-op default.

func SortReady added in v0.11.0

func SortReady(records []Task)

SortReady sorts records in place by priority (highest first) then CreatedAt ascending (oldest first within the same priority bucket). Priority is read from Context["priority"] and parsed as a P<N> rank where lower N == higher priority (P0 > P1 > P2). Tasks with no parsable priority sort to the END of the list — explicitly prioritized work always surfaces above unprioritized work.

The sort is stable so callers that pre-sort on a secondary key see that ordering preserved within ties.

Types

type AdminWrite added in v0.13.0

type AdminWrite struct {
	// contains filtered or unexported fields
}

AdminWrite is the import-restricted write surface for the hub-side migration / recovery loop and for test seeding. ADR 0052 makes Tx the only public mutation entry point on Manager; AdminWrite is the narrow exception. It is its own type so the reflection-based "no mutation outside Tx" test on Manager passes — Manager has no public mutation method, only Tx.

Production CLI code MUST NOT instantiate AdminWrite. The accepted callers are:

  • internal/hub: migration of pre-event-log KV records, recovery of orphaned events on hub start.
  • tests inside this repository: deterministic fixture seeding.

Every method here writes the KV without publishing an event. The caller is responsible for emitting a matching event when one is expected (e.g., the migration emits synthetic events explicitly).

func NewAdminWrite added in v0.13.0

func NewAdminWrite(mgr *Manager) AdminWrite

NewAdminWrite returns an AdminWrite bound to mgr. The constructor's signature signals intent at every call site; callers searching for "tasks.NewAdminWrite" find every place admin writes happen and can audit whether the exception is justified.

func (AdminWrite) Create added in v0.13.0

func (a AdminWrite) Create(ctx context.Context, t Task) error

Create writes a new task record without publishing an event. Hub- side migration uses this to seed records that are paired with synthetic events emitted directly to the stream.

func (AdminWrite) Purge added in v0.13.0

func (a AdminWrite) Purge(ctx context.Context, id string) error

Purge permanently removes a task key. Reachable from Manager.Purge already; re-exposed here for symmetry with the migration / recovery caller's surface, so a recovery loop never has to import multiple surfaces to do its job.

func (AdminWrite) Update added in v0.13.0

func (a AdminWrite) Update(
	ctx context.Context,
	id string,
	mutate func(Task) (Task, error),
) error

Update performs the same revision-gated CAS update as the legacy Manager.Update (now private), without publishing an event. Hub-side recovery uses this to bring a projection forward when the stream's latest event sequence outpaces the KV's LastEventSeq.

type ClaimArgs added in v0.13.0

type ClaimArgs struct {
	AgentID    string
	Slot       string
	ClaimEpoch uint64
	PrevAgent  string
	Mutate     func(Task) (Task, error)
}

ClaimArgs configures a tx.Claim call. AgentID and Mutate are required; Slot, PrevAgent, and ClaimEpoch are optional. The mutator runs inside the CAS loop and is responsible for the conditional rejection (already-claimed, epoch-stale) plus stamping ClaimedBy and any other fields. tx.Claim wraps it so callers do not have to remember to update LastEventSeq or UpdatedAt.

The Mutate callback IS the CAS body. It receives the current Task and returns the desired post-claim Task value or an error. Returning an error short-circuits the CAS loop and the Tx call returns the error unwrapped so coord can switch on its own sentinels.

type ClaimedPayload added in v0.13.0

type ClaimedPayload struct {
	AgentID    string `json:"agent_id"`
	Slot       string `json:"slot,omitempty"`
	ClaimEpoch uint64 `json:"claim_epoch"`
	PrevAgent  string `json:"prev_agent,omitempty"`
}

ClaimedPayload carries the agent that claimed and the optional slot. PrevAgent is non-empty for handoff/reclaim transitions where the claim moves from one agent to another without an intervening unclaimed event; replay in recovery uses it to reconstruct the audit trail correctly.

type ClosedPayload added in v0.13.0

type ClosedPayload struct {
	AgentID string `json:"agent_id"`
	Reason  string `json:"reason"`
}

ClosedPayload carries the closing agent and the close reason.

type Config

type Config struct {
	// BucketName is the JetStream KV bucket backing the task records.
	// ADR 0005 pins the coord-visible name to "bones-tasks"; this
	// package takes the name as input so tests can isolate by bucket.
	BucketName string

	// HistoryDepth is the per-key JetStream KV history depth. ADR 0005
	// recommends 8; Validate-equivalent rejection happens at Open.
	HistoryDepth uint8

	// MaxValueSize is the upper bound on an encoded task record value,
	// in bytes. Enforced at every Create and Update per invariant 14.
	MaxValueSize int32

	// ChanBuffer sets the channel buffer for Watch. Zero yields the
	// package default (defaultChanBuffer).
	ChanBuffer int

	// EnableEventLog opts the Manager into the append-only task event
	// log per ADR 0052. When true, Open creates (or attaches to) the
	// `tasks-events` JetStream stream and Tx publishes one event per
	// mutation. Coord's primary task Manager opens with this true; the
	// archive Manager (compacted closed-task snapshots) opens false
	// so that compaction-only writes do not flood the event log.
	EnableEventLog bool

	// RecoverOnOpen controls whether Open runs Recover() to reconcile
	// orphan events. Recovery is safe at hub start (single process,
	// no concurrent Tx in flight) but races against live Tx callers,
	// so every-call Open should leave this false. Hub start passes
	// true; coord callers, CLI verbs, and tests pass false.
	RecoverOnOpen bool
}

Config configures Open. Every field is required; there are no silent defaults. ADR 0005 fixes the recommended values — HistoryDepth 8, MaxValueSize 8 KB — and coord.Config is the enforcement surface for operator input. BucketName must match the JetStream KV regex ([A-Za-z0-9_-]+); violation is surfaced by the underlying CreateOrUpdateKeyValue call.

type CreatedPayload added in v0.13.0

type CreatedPayload struct {
	Title string   `json:"title"`
	Slot  string   `json:"slot,omitempty"`
	Files []string `json:"files,omitempty"`

	// Snapshot is the full Task projection at creation time. Carried so
	// migration-emitted synthetic events and compaction summaries can
	// hand a recovery loop the entire post-creation projection without
	// requiring it to walk subsequent updated events. Empty for live
	// Tx.Create emissions; populated by migration and compaction.
	Snapshot *Task `json:"snapshot,omitempty"`
}

CreatedPayload is the payload for an EventTypeCreated event. Slot is optional (empty when the task is not yet bound to a slot).

type Edge

type Edge struct {
	Type   EdgeType `json:"type"`
	Target string   `json:"target"`
}

Edge is an outgoing directed relationship carried on the source task. Storage is outgoing-only (ADR 0014); reverse lookups require a scan.

type EdgeType

type EdgeType string

EdgeType names a typed outgoing relationship from one task to another. See ADR 0014. Unknown string values decoded from storage are preserved as-is (invariant 26) so a future phase adding a new type stays round-trip compatible with records this version writes.

const (
	EdgeBlocks         EdgeType = "blocks"
	EdgeDiscoveredFrom EdgeType = "discovered-from"
	EdgeSupersedes     EdgeType = "supersedes"
	EdgeDuplicates     EdgeType = "duplicates"
)

type Event

type Event struct {
	ID   string
	Kind EventKind
	Task Task
}

Event is delivered to Watch callers on each observed task change. ID is the TaskID the event concerns; Kind identifies the shape; Task is populated for EventCreated and EventUpdated.

type EventEnvelope added in v0.13.0

type EventEnvelope struct {
	Type   EventType `json:"type"`
	TaskID string    `json:"task_id"`
	// Timestamp is the wall-clock instant the event was emitted.
	// timefmt.LoggedTime per #324: serializes as UTC RFC3339 so
	// recovery loops on a different host or different system zone
	// see the same instant a producing slot stamped. Decoding is
	// lenient (accepts RFC3339Nano with offset) so legacy records
	// pre-#324 still replay.
	Timestamp timefmt.LoggedTime `json:"timestamp"`
	Payload   json.RawMessage    `json:"payload"`

	// StreamSeq is the JetStream message sequence at the time the
	// envelope was consumed. Set by the recovery loop and the watch
	// consumer; not serialized.
	StreamSeq uint64 `json:"-"`
}

EventEnvelope is the wire format for one task event on the JetStream stream. JSON-serialized, persisted across the retention window; possibly read by a consumer running a different bones version. Additive-only on field changes — see ADR 0052.

StreamSeq is supplied by the consumer at decode time from the JetStream message metadata; it is NOT part of the JSON envelope and is zero for messages that have not yet been published.

func EncodeEnvelope added in v0.13.0

func EncodeEnvelope(t EventType, taskID string, payload any) (EventEnvelope, error)

EncodeEnvelope marshals an envelope and a typed payload into a single EventEnvelope ready for js.PublishMsg. The Timestamp is stamped here to UTC so callers do not have to remember to convert.

func UnmarshalEnvelope added in v0.13.0

func UnmarshalEnvelope(raw []byte) (EventEnvelope, error)

UnmarshalEnvelope deserializes raw stream bytes into an EventEnvelope. StreamSeq is left as zero — the consumer fills it from message metadata.

type EventKind

type EventKind int

EventKind identifies the shape of a task state change delivered by Watch.

const (
	// EventCreated fires on the first Put for a key. The accompanying
	// Event.Task holds the decoded record.
	EventCreated EventKind = iota + 1

	// EventUpdated fires on subsequent Puts for an existing key. The
	// accompanying Event.Task holds the decoded post-update record.
	EventUpdated

	// EventDeleted fires on Delete or Purge. Event.Task is the zero
	// value; Event.ID carries the affected key.
	EventDeleted
)

EventCreated, EventUpdated, and EventDeleted are the three shapes of task state changes a Watch subscriber observes.

func (EventKind) String

func (k EventKind) String() string

String returns a human-readable name for the EventKind.

type EventType added in v0.13.0

type EventType uint8

EventType identifies the shape of a task event published on the `tasks.events.>` JetStream stream. The set is closed (Go iota); a new event type requires a new constant, payload struct, tx.X method, row in ADR 0052, and a roundtrip test. There is no open-set string carrier for event names — a value outside the constants below is rejected at decode.

const (
	// EventTypeUnknown is the zero value used to flag a missing or
	// unknown type at decode. It is never published.
	EventTypeUnknown EventType = 0

	// EventTypeCreated stamps the first observation of a task (or the
	// synthetic baseline emitted by migration / compaction).
	EventTypeCreated EventType = iota

	// EventTypeClaimed marks a status edge open→claimed and stamps the
	// agent that took the claim plus the (optional) slot.
	EventTypeClaimed

	// EventTypeUnclaimed marks a status edge claimed→open with the free-
	// form unclaim reason.
	EventTypeUnclaimed

	// EventTypeUpdated carries one or more field-level changes as
	// (Field, Old, New) tuples. The log alone — no KV consult — must
	// suffice to reconstruct any prior state, so old and new values are
	// both carried.
	EventTypeUpdated

	// EventTypeLinked records a directed edge added to a task.
	EventTypeLinked

	// EventTypeSlotChanged records a slot reassignment.
	EventTypeSlotChanged

	// EventTypeClosed marks a status edge into closed and stamps the
	// closing agent and reason.
	EventTypeClosed
)

Event types per ADR 0052 §"Event types — closed iota set". Order is load-bearing only insofar as adding a new type appends; do not reorder, that would change wire encoding for stored events.

func (EventType) String added in v0.13.0

func (t EventType) String() string

String returns a human-readable name for the event type. Used by logging, the watch-line renderer, and the activity feed.

func (EventType) Valid added in v0.13.0

func (t EventType) Valid() bool

Valid reports whether t is one of the seven defined event types. EventTypeUnknown is the zero value and is invalid by convention.

type FieldChange added in v0.13.0

type FieldChange struct {
	Field string          `json:"field"`
	Old   json.RawMessage `json:"old"`
	New   json.RawMessage `json:"new"`
}

FieldChange is one (field, old, new) tuple carried in an UpdatedPayload. Old and New are both rendered as raw JSON to preserve type fidelity across replays — a string and a number with the same lexical form do not collide. Field is the canonical Task struct field name (e.g. "title", "files", "context").

func FieldChangeFromAny added in v0.13.0

func FieldChangeFromAny(field string, oldV, newV any) (FieldChange, error)

FieldChangeFromAny builds a FieldChange tuple from any-typed old/new values, marshaling each to JSON. Convenient for Update callers that already have the typed values in hand.

func MustFieldChange added in v0.13.0

func MustFieldChange(field string, oldV, newV any) FieldChange

MustFieldChange is FieldChangeFromAny that panics on encode failure. Use only for values whose JSON shape is statically known (strings, integers, simple structs); a panic here is a programmer bug.

type LinkedPayload added in v0.13.0

type LinkedPayload struct {
	OtherID  string `json:"other_id"`
	EdgeType string `json:"edge_type"`
}

LinkedPayload stamps a directed edge added to the task.

type LogReadOpts added in v0.13.0

type LogReadOpts struct {
	// FromSeq is the JetStream stream sequence to start at (inclusive).
	// 0 means unset.
	FromSeq uint64

	// Since is a wall-clock offset relative to time.Now(). 0 means
	// unset. Mutually exclusive with FromSeq.
	Since time.Duration

	// Limit caps the returned event count. 0 disables the cap.
	Limit int

	// FilterTaskID, when non-empty, restricts the read to events
	// matching `tasks.events.<task_id>`.
	FilterTaskID string
}

LogReadOpts configures a one-shot read of the event log via Replay. At most one of FromSeq and Since may be set. When both are zero, the reader returns the most recent Limit events (Limit defaults to RecentActivityCount).

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns a JetStream KV bucket that stores task records. Every public method is safe to call concurrently. Close is idempotent.

func Open

func Open(ctx context.Context, nc *nats.Conn, cfg Config) (*Manager, error)

Open creates (or reattaches to) the tasks KV bucket on the supplied NATS connection and returns a Manager. The caller owns nc's lifecycle; Open does not close it. Constructing a Manager does not consume a goroutine; Watch spawns one per call. Callers must invoke Close to release every live subscriber channel.

func (*Manager) Close

func (m *Manager) Close() error

Close releases resources held by the Manager. It closes every live Watch channel and marks the Manager as closed so subsequent public calls return ErrClosed. The NATS connection is owned by the caller and is not closed here. Safe to call more than once.

func (*Manager) Get

func (m *Manager) Get(
	ctx context.Context, id string,
) (Task, uint64, error)

Get reads a task record by ID. The second return is the KV revision the record was read at — callers that intend to Update pass this value into the mutate closure of Update. Returns ErrNotFound when the key is absent or carries a delete marker.

func (*Manager) KVForTest

func (m *Manager) KVForTest() jetstream.KeyValue

KVForTest returns the underlying JetStream KV handle so tests in sibling packages can stage CAS-conflict scenarios by writing directly to the bucket. Production code must not use this; the public API (Open, Close, Create, Get, Update, List, Watch) remains the sole supported surface. The export is intentionally verbose ("ForTest") so every call site reads as a test seam, not an accidental leak of the substrate.

func (*Manager) List

func (m *Manager) List(ctx context.Context) ([]Task, error)

List returns every task record currently readable in the bucket. Coord.Ready filters the slice client-side; this package performs no status filtering. Delete markers are skipped; malformed entries are skipped (they would indicate a corrupted write, which the watcher path would also drop).

func (*Manager) Live added in v0.14.0

func (m *Manager) Live(ctx context.Context) (<-chan EventEnvelope, error)

Live opens a live subscription to the task event log. The returned channel emits envelopes published *after* the call (DeliverNewPolicy) until ctx is canceled, the manager is closed, or the underlying consumer stops. Symmetric to Replay (one-shot drain) but for the continuous tail used by `bones tasks watch`.

Per ADR 0052 the event log is the source of truth for task state; Live consumers see the full envelope (Type, TaskID, Timestamp, Payload) on every event — no projection lookup required. Field-level changes for EventTypeUpdated land in the payload, so context-only updates surface here even though they don't mutate the KV projection shape consumers can distinguish.

Callers must drain the channel promptly; a blocked reader stalls the forwarder. Buffer is sized for typical interactive use (64). Decode errors are dropped silently — one malformed record does not poison the stream.

func (*Manager) Purge

func (m *Manager) Purge(ctx context.Context, id string) error

Purge permanently removes a task key from the bucket so future Get/List calls do not observe it. Returns ErrNotFound when the key is absent.

func (*Manager) Recent added in v0.13.0

func (m *Manager) Recent(
	ctx context.Context, n int,
) ([]EventEnvelope, error)

Recent returns the most recent n events from the log in stream order (oldest of the slice first, newest last). Used by `bones status` for the Recent Activity surface.

func (*Manager) Replay added in v0.13.0

func (m *Manager) Replay(
	ctx context.Context, opts LogReadOpts,
) ([]EventEnvelope, error)

Replay reads events from the task event log according to opts and returns them in stream order. Used by `bones tasks watch`'s --from / --since backfill and by `bones status`'s recent-activity surface.

Replay is one-shot: it returns once the consumer drains; callers wanting live updates use Watch instead.

func (*Manager) Tx added in v0.13.0

func (m *Manager) Tx(
	ctx context.Context, taskID string,
	fn func(tx *Tx) error,
) error

Tx is the only public mutation entry point on Manager. The callback receives a *Tx and calls one or more tx.X methods to mutate the task. Each tx.X method publishes one event to the JetStream event log, then writes the corresponding KV projection. Returns ErrNoMutations if the callback completes without calling any tx.X method, or any error returned by the callback or any tx.X call.

Concurrency: multiple Tx invocations on the same task race; the KV CAS layer detects the conflict and the losing call retries inside the underlying jskv.MaxRetries-bounded loop.

func (*Manager) Watch

func (m *Manager) Watch(
	ctx context.Context,
) (<-chan Event, error)

Watch opens a subscription over all task state changes. The returned channel is closed when ctx is done, when Close is called on the Manager, or when the underlying JetStream watcher stops.

Watch emits the current bucket contents before live updates begin: each surviving key arrives as an EventCreated event. Callers that want only live changes can discard events until their first post- subscribe write round-trips. Deleted markers in the initial snapshot are skipped.

Callers must drain the channel promptly. A blocked reader stalls the watcher-forwarding goroutine and delays every other subscriber for at most one send. Buffer size is Config.ChanBuffer (default 32).

type SlotChangedPayload added in v0.13.0

type SlotChangedPayload struct {
	From string `json:"from,omitempty"`
	To   string `json:"to"`
}

SlotChangedPayload carries a slot reassignment. From may be empty for a first slot binding; To is always populated.

type Status

type Status string

Status is the task lifecycle state. ADR 0005 fixes the enum to exactly these three values; invariant 13 (amended by ADR 0007) enforces the transition DAG (open→claimed, open→closed, claimed→closed, claimed→open) at write time.

const (
	StatusOpen    Status = "open"
	StatusClaimed Status = "claimed"
	StatusClosed  Status = "closed"
)

StatusOpen marks a task that has been declared but not yet claimed. StatusClaimed marks a task currently held by one agent. StatusClosed marks a terminal task; closed is a sink state.

type Tally added in v0.13.0

type Tally struct {
	// Open is the count of tasks currently in StatusOpen.
	Open int
	// Claimed is the count of tasks currently in StatusClaimed.
	Claimed int
	// Closed is the count of tasks currently in StatusClosed.
	Closed int
	// Total is Open+Claimed+Closed.
	Total int
}

Tally is the per-status summary that both `bones status` and `bones tasks status` render. Counts are derived by replaying events (not by walking the KV) so the two surfaces share one source. Per ADR 0052 §"TaskTally — single source for counts".

func TaskTally added in v0.13.0

func TaskTally(events []EventEnvelope) Tally

TaskTally derives a Tally by replaying events into a per-task status projection and bucket-counting at the end. The events slice should be the full log (or a subset that includes the latest state-changing event for each task ID); ordering by stream sequence is the caller's responsibility — recovery and live consumers receive ordered events by construction.

Events with unknown types are ignored. A task is counted under its most recent terminal status; a closed task that was previously reopened (not legal under the current DAG, but defensive) reports the latest status seen.

type Task

type Task struct {
	// ID is the TaskID. It must equal the KV key at Create time; the
	// duplication is deliberate — any future migration that walks the
	// bucket can use the value-side ID without parsing the key.
	ID string `json:"id"`

	// Title is the human-readable task summary. Bounded by the caller;
	// no enforcement here beyond the MaxValueSize cap.
	Title string `json:"title"`

	// Status is the lifecycle state; see Status/validStatus.
	Status Status `json:"status"`

	// ClaimedBy is the AgentID currently holding the claim. Invariant 11
	// requires non-empty iff Status == StatusClaimed; Update enforces.
	ClaimedBy string `json:"claimed_by,omitempty"`

	// Files is the absolute-path list of files this task touches. Sorted
	// by the writer (coord.OpenTask); this package stores it verbatim.
	Files []string `json:"files"`

	// Parent is the optional parent TaskID — empty when this is a root
	// task in a decomposition chain.
	Parent string `json:"parent,omitempty"`

	// Edges are outgoing typed relationships to other tasks. Invariant 25
	// forbids duplicate (Type, Target) pairs; coord.Link enforces this on
	// write. Additive in ADR 0014; nil in records written before that ADR.
	Edges []Edge `json:"edges,omitempty"`

	// Context is the caller-supplied free-form metadata. ADR 0005 caps
	// the effective size against MaxValueSize in concert with the other
	// fields; this package only size-checks the encoded whole.
	Context map[string]string `json:"context,omitempty"`

	// CreatedAt is the wall-clock time of the initial Create.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is the wall-clock time of the most recent write.
	UpdatedAt time.Time `json:"updated_at"`

	// DeferUntil hides the task from Ready until this UTC wall-clock
	// instant. Nil means immediately eligible subject to the other
	// readiness gates. Added in schema v2.
	DeferUntil *time.Time `json:"defer_until,omitempty"`

	// ClosedAt is the wall-clock time of transition into StatusClosed.
	// Nil when Status != StatusClosed; pointer makes the zero value
	// observable (and distinct from a legitimate January-1-0001 write).
	ClosedAt *time.Time `json:"closed_at,omitempty"`

	// ClosedBy is the AgentID that closed the task; empty if not closed.
	ClosedBy string `json:"closed_by,omitempty"`

	// ClosedReason is the free-form close reason; empty if not closed.
	ClosedReason string `json:"closed_reason,omitempty"`

	// ClaimEpoch is the monotonic counter bumped on every successful
	// Claim or Reclaim. Invariant 24 requires strict increase per Claim/
	// Reclaim; Commit and CloseTask fence against it to refuse zombie
	// writes after a Reclaim. Zero on records that never had a claim
	// (legacy records decode to zero; first Claim bumps to 1). ADR 0007.
	ClaimEpoch uint64 `json:"claim_epoch,omitempty"`

	// OriginalSize is the pre-compaction canonical source size for the
	// latest compaction level. Zero means the task has not been compacted.
	OriginalSize uint64 `json:"original_size,omitempty"`

	// CompactLevel is the number of compaction passes applied to this
	// task. Zero means un-compacted. Added in schema v3.
	CompactLevel uint8 `json:"compact_level,omitempty"`

	// CompactedAt is the wall-clock time of the latest compaction pass.
	// Nil means the task has not been compacted. Added in schema v3.
	CompactedAt *time.Time `json:"compacted_at,omitempty"`

	// SchemaVersion stamps the schema this record was written against.
	SchemaVersion int `json:"schema_version"`

	// LastEventSeq is the JetStream stream sequence of the most recent
	// task event whose effect has been applied to this projection. The
	// hub-side recovery loop on bones up reads the live KV record and
	// the stream's latest sequence per task subject; if the stream
	// sequence is greater, pending events are replayed and the
	// projection is brought to parity. Added in schema v4 (ADR 0052).
	LastEventSeq uint64 `json:"last_event_seq,omitempty"`
}

Task is the value stored at each TaskID key in the KV bucket. The struct is persisted as JSON; every timestamp is wall-clock UTC so that two processes reading the same entry reach the same verdict regardless of local clock skew. ADR 0005 is the canonical schema.

func FilterReady added in v0.11.0

func FilterReady(records []Task, now time.Time) []Task

FilterReady returns the subset of records that are eligible for an agent to claim right now: open, unclaimed, no incoming open blocks/supersedes/duplicates edges, no open child task naming the record as Parent, and not deferred into the future relative to now.

The DAG check mirrors coord.Ready (see internal/coord/ready.go) but operates directly on []Task records so consumers that already have a task list (the CLI's `tasks ready` verb) do not need to open a coord session. The two implementations stay in sync by sharing the same rule list — any new readiness gate must be added in both places.

discovered-from edges are intentionally ignored — they are audit metadata, not a ready-blocker.

type Tx added in v0.13.0

type Tx struct {
	// contains filtered or unexported fields
}

Tx is the carrier passed to Manager.Tx's callback. Each tx.X method is one mutation. Tx is short-lived: its scope is the duration of one Manager.Tx call. Methods are not safe to call after the callback returns.

Tx is not concurrency-safe. The callback must serialize its tx.X calls; callers needing multiple parallel writes should call Manager.Tx once per task and let the substrate serialize.

func (*Tx) Claim added in v0.13.0

func (tx *Tx) Claim(args ClaimArgs) error

Claim publishes a claimed event and applies the supplied mutator inside a CAS loop. The mutator owns the conditional rejection semantics — coord.Claim's claimMutator, coord.Reclaim's reclaimMutator, and coord.HandoffClaim's handoffMutator all plug in here unchanged.

The publish-then-CAS order means: a mutator that rejects after publish leaves a "spurious claimed" event in the log. Recovery's LastEventSeq guard prevents replay from clobbering a later projection; the audit-trail correctness is what the closed-set event types and (field, old, new) tuples guarantee for replay (see ADR 0052 §"Recovery").

func (*Tx) Close added in v0.13.0

func (tx *Tx) Close(agentID, reason string, mutate func(Task) (Task, error)) error

Close publishes a closed event and updates the KV record into the terminal closed state. The mutator-shaped logic (invariant 12, invariant 24, agent identity check) lives on the caller side; tx.Close accepts a mutate callback so coord-level guards stay where they belong.

func (*Tx) Create added in v0.13.0

func (tx *Tx) Create(t Task) error

Create publishes a created event and writes the initial KV record. t.ID must equal tx's taskID. The event Snapshot field is left unpopulated; readers reconstruct the projection from the live KV.

func (tx *Tx) Link(otherID, edgeType string) error

Link publishes a linked event and updates the KV record's Edges slice. Duplicate edges are caller-prevented (coord.Link enforces invariant 25); this method does not re-check.

func (*Tx) SlotChange added in v0.13.0

func (tx *Tx) SlotChange(from, to string) error

SlotChange publishes a slot_changed event. The KV projection does not store slot directly (slots are an external swarm concept); the event is purely audit. From may be empty for first slot bind.

func (*Tx) Unclaim added in v0.13.0

func (tx *Tx) Unclaim(reason string, mutate func(Task) (Task, error)) error

Unclaim publishes an unclaimed event and applies the supplied mutator inside a CAS loop. The mutator owns the conditional rejection semantics (e.g. coord's "is the claim still ours" check surfacing as errClaimCASNoOp); a nil mutator falls back to the default "set status=open, clear claimed_by" projection.

reason is the free-form audit string recorded on the event but NOT stamped on the KV — operators recover it from the log.

func (*Tx) Update added in v0.13.0

func (tx *Tx) Update(
	mutate func(Task) (Task, error),
	changes ...FieldChange,
) error

Update publishes an updated event carrying one or more (field, old, new) tuples and applies the mutator to the KV record. The mutator is responsible for actually writing the new field values; the FieldChange tuples are descriptive (for the log) and must agree with the projection the mutator produces. Callers compose the mutate closure to keep the legacy invariant-checking semantics; the FieldChange tuples are caller-provided too.

type UnclaimedPayload added in v0.13.0

type UnclaimedPayload struct {
	Reason string `json:"reason"`
}

UnclaimedPayload carries the un-claim reason. The previously-claiming agent is recoverable from the prior claimed event in the log; we do not duplicate it here.

type UpdatedPayload added in v0.13.0

type UpdatedPayload struct {
	Changes []FieldChange `json:"changes"`
}

UpdatedPayload aggregates one or more FieldChange tuples. A Tx that changes multiple fields in one tx.Update call emits one updated event with all changes; multiple sequential tx.Update calls emit one event each.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL