Documentation
¶
Overview ¶
Package tasks is the substrate layer that stores task records in a NATS JetStream KV bucket. It exposes CAS-atomic primitives — Create, Get, Update, List, Watch — consumed exclusively by the coord package. See docs/adr/0005-tasks-in-nats-kv.md for the schema, bucket, and retention rules this package enforces.
This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it.
Index ¶
- Constants
- Variables
- func CompactPerTask(ctx context.Context, m *Manager, taskID string) (int, error)
- func DecodePayload(env EventEnvelope) (any, error)
- func EncodeForTest(t Task) ([]byte, error)
- func EventSubject(taskID string) string
- func MarshalEnvelope(env EventEnvelope) ([]byte, error)
- func Migrate(ctx context.Context, m *Manager) (int, error)
- func PriorityRank(t Task) (int, bool)
- func Recover(ctx context.Context, m *Manager) (int, error)
- func SetCASRetryHookForTest(fn func()) (restore func())
- func SetUpdatePreWriteHookForTest(fn func()) (restore func())
- func SortReady(records []Task)
- type AdminWrite
- type ClaimArgs
- type ClaimedPayload
- type ClosedPayload
- type Config
- type CreatedPayload
- type Edge
- type EdgeType
- type Event
- type EventEnvelope
- type EventKind
- type EventType
- type FieldChange
- type LinkedPayload
- type LogReadOpts
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) Get(ctx context.Context, id string) (Task, uint64, error)
- func (m *Manager) KVForTest() jetstream.KeyValue
- func (m *Manager) List(ctx context.Context) ([]Task, error)
- func (m *Manager) Live(ctx context.Context) (<-chan EventEnvelope, error)
- func (m *Manager) Purge(ctx context.Context, id string) error
- func (m *Manager) Recent(ctx context.Context, n int) ([]EventEnvelope, error)
- func (m *Manager) Replay(ctx context.Context, opts LogReadOpts) ([]EventEnvelope, error)
- func (m *Manager) Tx(ctx context.Context, taskID string, fn func(tx *Tx) error) error
- func (m *Manager) Watch(ctx context.Context) (<-chan Event, error)
- type SlotChangedPayload
- type Status
- type Tally
- type Task
- type Tx
- func (tx *Tx) Claim(args ClaimArgs) error
- func (tx *Tx) Close(agentID, reason string, mutate func(Task) (Task, error)) error
- func (tx *Tx) Create(t Task) error
- func (tx *Tx) Link(otherID, edgeType string) error
- func (tx *Tx) SlotChange(from, to string) error
- func (tx *Tx) Unclaim(reason string, mutate func(Task) (Task, error)) error
- func (tx *Tx) Update(mutate func(Task) (Task, error), changes ...FieldChange) error
- type UnclaimedPayload
- type UpdatedPayload
Constants ¶
const AllEventsSubject = "tasks.events.>"
AllEventsSubject is the wildcard subject that matches every task event on the stream.
const DefaultBucketName = "bones-tasks"
DefaultBucketName is the canonical JetStream KV bucket name for task records, pinned by ADR 0005. Both the CLI's openManager helper and internal/coord must point at this same bucket; otherwise create-then- link/prime/autoclaim cross-store and silently miss the just-created task. Tests may override via Config.BucketName for isolation.
const EventRetentionWindow = 90 * 24 * time.Hour
EventRetentionWindow is the time-based fallback for compaction. Events older than this for a task that also has at least one newer event are eligible for compaction. ADR 0052 fixes this at 90 days.
const EventStreamName = "tasks-events"
EventStreamName is the JetStream stream name backing the task event log. Callers in internal/hub provision the stream with this name and the AllEventsSubject filter.
const PerTaskEventCap = 50
PerTaskEventCap is the maximum number of events kept per task before older events are compacted into a synthetic created summary. ADR 0052 fixes this at 50.
const RecentActivityCount = 20
RecentActivityCount is the default number of events `bones status` surfaces under "Recent activity" per ADR 0052. Tunable as a constant rather than a flag — operators wanting more reach for `bones tasks watch --since=<duration>`.
const SchemaVersion = 4
SchemaVersion is the currently-written task record schema. Every Create stamps this on the record so future migrations can fan out on observed version. v3 adds closed-task compaction metadata. v4 adds LastEventSeq, the projection-watermark wired by the event log per ADR 0052.
Variables ¶
var ErrAlreadyExists = errors.New("tasks: record already exists")
ErrAlreadyExists reports that Create was called for a TaskID that already has a record in the bucket. Collisions are programmer errors under the ADR 0005 ID generator, but the sentinel exists so callers can distinguish a duplicate Create from an unrelated substrate error.
var ErrCASConflict = errors.New(
"tasks: CAS conflict exceeded retries",
)
ErrCASConflict reports that Update exhausted jskv.MaxRetries without converging. Under normal contention this should never surface; its presence in a call site's error handling is the explicit surrender boundary for the CAS retry loop.
var ErrClosed = errors.New("tasks: manager is closed")
ErrClosed reports that a public method was called on a Manager whose Close has returned. Close-race with an in-flight call surfaces this error rather than a data race or nil dereference.
var ErrEventLogDisabled = errors.New(
"tasks: event log disabled on this manager",
)
ErrEventLogDisabled reports that a Tx mutation was attempted on a Manager opened without an event log. coord's archive Manager opens in this mode for compaction-only writes; production Tx callers always have the event log enabled.
var ErrInvalidStatus = errors.New("tasks: invalid status value")
ErrInvalidStatus reports that a Task's Status field was outside the fixed {open, claimed, closed} enum. Any mutation that would write an unknown status returns this error before the CAS call.
var ErrInvalidTransition = errors.New(
"tasks: invalid status transition",
)
ErrInvalidTransition reports that a mutation attempted a status edge outside the ADR 0005 DAG as amended by ADR 0007: legal edges are open→claimed, open→closed, claimed→closed, and claimed→open (the release-side un-claim edge). Any other backwards edge from closed or self-loop on closed is rejected.
var ErrInvariant11 = errors.New(
"tasks: claimed_by/status mismatch (invariant 11)",
)
ErrInvariant11 reports that a mutation violated invariant 11: the claimed_by field must be non-empty iff status == claimed. Rejected before the CAS call so bucket state never observes the inconsistent pair.
var ErrNoMutations = errors.New(
"tasks: Tx callback made no mutations",
)
ErrNoMutations reports that a Tx callback returned without invoking any tx.X method. Tx is the only mutation entry point on Manager and returning without one is a programmer error caught at the API boundary rather than allowed to silently no-op.
var ErrNotFound = errors.New("tasks: record not found")
ErrNotFound reports that Get or Update was called for a TaskID absent from the bucket. Coord translates this into its own public sentinel when composing higher-level verbs.
var ErrValueTooLarge = errors.New(
"tasks: encoded value exceeds max size",
)
ErrValueTooLarge reports that a Task, after JSON encoding, exceeded Config.MaxValueSize. Enforced at every write per invariant 14.
Functions ¶
func CompactPerTask ¶ added in v0.13.0
CompactPerTask replaces the older events for one task with a single synthetic created event carrying the full post-compaction snapshot. Returns the number of events removed (i.e., per-task event count before minus after).
CompactPerTask is the per-task primitive used by the hub-side compaction worker. The worker is a follow-up; this primitive lets tests exercise the compaction semantics without scheduling.
Safety: compaction runs only on tasks whose latest projection exists in KV. Aborts (no-op) if the projection is missing — the task may have been purged.
func DecodePayload ¶ added in v0.13.0
func DecodePayload(env EventEnvelope) (any, error)
DecodePayload unmarshals env.Payload into a typed payload struct matching env.Type. The returned value is one of the *Payload types defined in this file.
func EncodeForTest ¶
EncodeForTest exposes the package's internal task encoder so sibling test packages can produce wire-compatible bytes to Put directly into the KV bucket. Not part of the supported public API.
func EventSubject ¶ added in v0.13.0
EventSubject returns the JetStream subject for events scoped to one task ID. Subjects under tasks.events.> partition by task so a Watch can replay one task's history with a subject filter.
func MarshalEnvelope ¶ added in v0.13.0
func MarshalEnvelope(env EventEnvelope) ([]byte, error)
MarshalEnvelope serializes env to the bytes published on the stream. JSON for parity with internal/chat.Envelope per ADR 0047.
func Migrate ¶ added in v0.13.0
Migrate emits synthetic events for every pre-existing KV record so the event log carries a baseline for recovery and audit. Idempotent via migrationMarkerKey; running Migrate twice is a no-op.
This is the one-time path for upgrading a workspace from before the event log was the source of truth (schema v3 or earlier). Documented limit per ADR 0052: pre-migration update history is lost — only created / claimed / closed bookmarks are reconstructible.
func PriorityRank ¶ added in v0.11.0
PriorityRank returns the numeric rank of t's priority and whether the priority parsed cleanly. The convention matches beads: Context["priority"] holds a "P<N>" string (e.g. "P0", "P1", "P2") where lower N == higher priority. Unprioritized records report (0, false) — callers use the second return to decide ordering.
func Recover ¶ added in v0.13.0
Recover reconciles the KV projection with the event log on hub start. For each unique task ID with at least one event on the stream, the recovery loop walks events newer than the KV's LastEventSeq and applies them via AdminWrite (a no-op when the projection is already up to date). Returns the number of orphan events replayed.
Recovery is idempotent: repeated calls on a stable substrate replay no events. Concurrent Recover calls race only on the underlying KV CAS; the substrate serializes them.
func SetCASRetryHookForTest ¶
func SetCASRetryHookForTest(fn func()) (restore func())
SetCASRetryHookForTest installs fn as the per-retry hook used by Update's CAS loop, returning a restore function the test must call (typically via t.Cleanup) to reinstate the no-op default. Tests use this to observe the number of CAS retries without instrumenting the KV transport. The hook fires exactly once per retry — never on the first attempt or after a final verdict.
func SetUpdatePreWriteHookForTest ¶
func SetUpdatePreWriteHookForTest(fn func()) (restore func())
SetUpdatePreWriteHookForTest installs fn as the per-attempt hook used by Update's CAS loop between the Get and the Update calls. Tests use this seam to deterministically force CAS conflicts by performing a direct Put while fn runs — every attempt reliably fails under such a hook, making the retry-exhaustion path reachable in a single test. Restore must be called (typically via t.Cleanup) to reinstate the no-op default.
func SortReady ¶ added in v0.11.0
func SortReady(records []Task)
SortReady sorts records in place by priority (highest first) then CreatedAt ascending (oldest first within the same priority bucket). Priority is read from Context["priority"] and parsed as a P<N> rank where lower N == higher priority (P0 > P1 > P2). Tasks with no parsable priority sort to the END of the list — explicitly prioritized work always surfaces above unprioritized work.
The sort is stable so callers that pre-sort on a secondary key see that ordering preserved within ties.
Types ¶
type AdminWrite ¶ added in v0.13.0
type AdminWrite struct {
// contains filtered or unexported fields
}
AdminWrite is the import-restricted write surface for the hub-side migration / recovery loop and for test seeding. ADR 0052 makes Tx the only public mutation entry point on Manager; AdminWrite is the narrow exception. It is its own type so the reflection-based "no mutation outside Tx" test on Manager passes — Manager has no public mutation method, only Tx.
Production CLI code MUST NOT instantiate AdminWrite. The accepted callers are:
- internal/hub: migration of pre-event-log KV records, recovery of orphaned events on hub start.
- tests inside this repository: deterministic fixture seeding.
Every method here writes the KV without publishing an event. The caller is responsible for emitting a matching event when one is expected (e.g., the migration emits synthetic events explicitly).
func NewAdminWrite ¶ added in v0.13.0
func NewAdminWrite(mgr *Manager) AdminWrite
NewAdminWrite returns an AdminWrite bound to mgr. The constructor's signature signals intent at every call site; callers searching for "tasks.NewAdminWrite" find every place admin writes happen and can audit whether the exception is justified.
func (AdminWrite) Create ¶ added in v0.13.0
func (a AdminWrite) Create(ctx context.Context, t Task) error
Create writes a new task record without publishing an event. Hub- side migration uses this to seed records that are paired with synthetic events emitted directly to the stream.
func (AdminWrite) Purge ¶ added in v0.13.0
func (a AdminWrite) Purge(ctx context.Context, id string) error
Purge permanently removes a task key. Reachable from Manager.Purge already; re-exposed here for symmetry with the migration / recovery caller's surface, so a recovery loop never has to import multiple surfaces to do its job.
func (AdminWrite) Update ¶ added in v0.13.0
func (a AdminWrite) Update( ctx context.Context, id string, mutate func(Task) (Task, error), ) error
Update performs the same revision-gated CAS update as the legacy Manager.Update (now private), without publishing an event. Hub-side recovery uses this to bring a projection forward when the stream's latest event sequence outpaces the KV's LastEventSeq.
type ClaimArgs ¶ added in v0.13.0
type ClaimArgs struct {
AgentID string
Slot string
ClaimEpoch uint64
PrevAgent string
Mutate func(Task) (Task, error)
}
ClaimArgs configures a tx.Claim call. AgentID and Mutate are required; Slot, PrevAgent, and ClaimEpoch are optional. The mutator runs inside the CAS loop and is responsible for the conditional rejection (already-claimed, epoch-stale) plus stamping ClaimedBy and any other fields. tx.Claim wraps it so callers do not have to remember to update LastEventSeq or UpdatedAt.
The Mutate callback IS the CAS body. It receives the current Task and returns the desired post-claim Task value or an error. Returning an error short-circuits the CAS loop and the Tx call returns the error unwrapped so coord can switch on its own sentinels.
type ClaimedPayload ¶ added in v0.13.0
type ClaimedPayload struct {
AgentID string `json:"agent_id"`
Slot string `json:"slot,omitempty"`
ClaimEpoch uint64 `json:"claim_epoch"`
PrevAgent string `json:"prev_agent,omitempty"`
}
ClaimedPayload carries the agent that claimed and the optional slot. PrevAgent is non-empty for handoff/reclaim transitions where the claim moves from one agent to another without an intervening unclaimed event; replay in recovery uses it to reconstruct the audit trail correctly.
type ClosedPayload ¶ added in v0.13.0
ClosedPayload carries the closing agent and the close reason.
type Config ¶
type Config struct {
// BucketName is the JetStream KV bucket backing the task records.
// ADR 0005 pins the coord-visible name to "bones-tasks"; this
// package takes the name as input so tests can isolate by bucket.
BucketName string
// HistoryDepth is the per-key JetStream KV history depth. ADR 0005
// recommends 8; Validate-equivalent rejection happens at Open.
HistoryDepth uint8
// MaxValueSize is the upper bound on an encoded task record value,
// in bytes. Enforced at every Create and Update per invariant 14.
MaxValueSize int32
// ChanBuffer sets the channel buffer for Watch. Zero yields the
// package default (defaultChanBuffer).
ChanBuffer int
// EnableEventLog opts the Manager into the append-only task event
// log per ADR 0052. When true, Open creates (or attaches to) the
// `tasks-events` JetStream stream and Tx publishes one event per
// mutation. Coord's primary task Manager opens with this true; the
// archive Manager (compacted closed-task snapshots) opens false
// so that compaction-only writes do not flood the event log.
EnableEventLog bool
// RecoverOnOpen controls whether Open runs Recover() to reconcile
// orphan events. Recovery is safe at hub start (single process,
// no concurrent Tx in flight) but races against live Tx callers,
// so every-call Open should leave this false. Hub start passes
// true; coord callers, CLI verbs, and tests pass false.
RecoverOnOpen bool
}
Config configures Open. Every field is required; there are no silent defaults. ADR 0005 fixes the recommended values — HistoryDepth 8, MaxValueSize 8 KB — and coord.Config is the enforcement surface for operator input. BucketName must match the JetStream KV regex ([A-Za-z0-9_-]+); violation is surfaced by the underlying CreateOrUpdateKeyValue call.
type CreatedPayload ¶ added in v0.13.0
type CreatedPayload struct {
Title string `json:"title"`
Slot string `json:"slot,omitempty"`
Files []string `json:"files,omitempty"`
// Snapshot is the full Task projection at creation time. Carried so
// migration-emitted synthetic events and compaction summaries can
// hand a recovery loop the entire post-creation projection without
// requiring it to walk subsequent updated events. Empty for live
// Tx.Create emissions; populated by migration and compaction.
Snapshot *Task `json:"snapshot,omitempty"`
}
CreatedPayload is the payload for an EventTypeCreated event. Slot is optional (empty when the task is not yet bound to a slot).
type Edge ¶
Edge is an outgoing directed relationship carried on the source task. Storage is outgoing-only (ADR 0014); reverse lookups require a scan.
type EdgeType ¶
type EdgeType string
EdgeType names a typed outgoing relationship from one task to another. See ADR 0014. Unknown string values decoded from storage are preserved as-is (invariant 26) so a future phase adding a new type stays round-trip compatible with records this version writes.
type Event ¶
Event is delivered to Watch callers on each observed task change. ID is the TaskID the event concerns; Kind identifies the shape; Task is populated for EventCreated and EventUpdated.
type EventEnvelope ¶ added in v0.13.0
type EventEnvelope struct {
Type EventType `json:"type"`
TaskID string `json:"task_id"`
// Timestamp is the wall-clock instant the event was emitted.
// timefmt.LoggedTime per #324: serializes as UTC RFC3339 so
// recovery loops on a different host or different system zone
// see the same instant a producing slot stamped. Decoding is
// lenient (accepts RFC3339Nano with offset) so legacy records
// pre-#324 still replay.
Timestamp timefmt.LoggedTime `json:"timestamp"`
Payload json.RawMessage `json:"payload"`
// StreamSeq is the JetStream message sequence at the time the
// envelope was consumed. Set by the recovery loop and the watch
// consumer; not serialized.
StreamSeq uint64 `json:"-"`
}
EventEnvelope is the wire format for one task event on the JetStream stream. JSON-serialized, persisted across the retention window; possibly read by a consumer running a different bones version. Additive-only on field changes — see ADR 0052.
StreamSeq is supplied by the consumer at decode time from the JetStream message metadata; it is NOT part of the JSON envelope and is zero for messages that have not yet been published.
func EncodeEnvelope ¶ added in v0.13.0
func EncodeEnvelope(t EventType, taskID string, payload any) (EventEnvelope, error)
EncodeEnvelope marshals an envelope and a typed payload into a single EventEnvelope ready for js.PublishMsg. The Timestamp is stamped here to UTC so callers do not have to remember to convert.
func UnmarshalEnvelope ¶ added in v0.13.0
func UnmarshalEnvelope(raw []byte) (EventEnvelope, error)
UnmarshalEnvelope deserializes raw stream bytes into an EventEnvelope. StreamSeq is left as zero — the consumer fills it from message metadata.
type EventKind ¶
type EventKind int
EventKind identifies the shape of a task state change delivered by Watch.
const ( // EventCreated fires on the first Put for a key. The accompanying // Event.Task holds the decoded record. EventCreated EventKind = iota + 1 // EventUpdated fires on subsequent Puts for an existing key. The // accompanying Event.Task holds the decoded post-update record. EventUpdated // EventDeleted fires on Delete or Purge. Event.Task is the zero // value; Event.ID carries the affected key. EventDeleted )
EventCreated, EventUpdated, and EventDeleted are the three shapes of task state changes a Watch subscriber observes.
type EventType ¶ added in v0.13.0
type EventType uint8
EventType identifies the shape of a task event published on the `tasks.events.>` JetStream stream. The set is closed (Go iota); a new event type requires a new constant, payload struct, tx.X method, row in ADR 0052, and a roundtrip test. There is no open-set string carrier for event names — a value outside the constants below is rejected at decode.
const ( // EventTypeUnknown is the zero value used to flag a missing or // unknown type at decode. It is never published. EventTypeUnknown EventType = 0 // EventTypeCreated stamps the first observation of a task (or the // synthetic baseline emitted by migration / compaction). EventTypeCreated EventType = iota // EventTypeClaimed marks a status edge open→claimed and stamps the // agent that took the claim plus the (optional) slot. EventTypeClaimed // EventTypeUnclaimed marks a status edge claimed→open with the free- // form unclaim reason. EventTypeUnclaimed // EventTypeUpdated carries one or more field-level changes as // (Field, Old, New) tuples. The log alone — no KV consult — must // suffice to reconstruct any prior state, so old and new values are // both carried. EventTypeUpdated // EventTypeLinked records a directed edge added to a task. EventTypeLinked // EventTypeSlotChanged records a slot reassignment. EventTypeSlotChanged // EventTypeClosed marks a status edge into closed and stamps the // closing agent and reason. EventTypeClosed )
Event types per ADR 0052 §"Event types — closed iota set". Order is load-bearing only insofar as adding a new type appends; do not reorder, that would change wire encoding for stored events.
type FieldChange ¶ added in v0.13.0
type FieldChange struct {
Field string `json:"field"`
Old json.RawMessage `json:"old"`
New json.RawMessage `json:"new"`
}
FieldChange is one (field, old, new) tuple carried in an UpdatedPayload. Old and New are both rendered as raw JSON to preserve type fidelity across replays — a string and a number with the same lexical form do not collide. Field is the canonical Task struct field name (e.g. "title", "files", "context").
func FieldChangeFromAny ¶ added in v0.13.0
func FieldChangeFromAny(field string, oldV, newV any) (FieldChange, error)
FieldChangeFromAny builds a FieldChange tuple from any-typed old/new values, marshaling each to JSON. Convenient for Update callers that already have the typed values in hand.
func MustFieldChange ¶ added in v0.13.0
func MustFieldChange(field string, oldV, newV any) FieldChange
MustFieldChange is FieldChangeFromAny that panics on encode failure. Use only for values whose JSON shape is statically known (strings, integers, simple structs); a panic here is a programmer bug.
type LinkedPayload ¶ added in v0.13.0
LinkedPayload stamps a directed edge added to the task.
type LogReadOpts ¶ added in v0.13.0
type LogReadOpts struct {
// FromSeq is the JetStream stream sequence to start at (inclusive).
// 0 means unset.
FromSeq uint64
// Since is a wall-clock offset relative to time.Now(). 0 means
// unset. Mutually exclusive with FromSeq.
Since time.Duration
// Limit caps the returned event count. 0 disables the cap.
Limit int
// FilterTaskID, when non-empty, restricts the read to events
// matching `tasks.events.<task_id>`.
FilterTaskID string
}
LogReadOpts configures a one-shot read of the event log via Replay. At most one of FromSeq and Since may be set. When both are zero, the reader returns the most recent Limit events (Limit defaults to RecentActivityCount).
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns a JetStream KV bucket that stores task records. Every public method is safe to call concurrently. Close is idempotent.
func Open ¶
Open creates (or reattaches to) the tasks KV bucket on the supplied NATS connection and returns a Manager. The caller owns nc's lifecycle; Open does not close it. Constructing a Manager does not consume a goroutine; Watch spawns one per call. Callers must invoke Close to release every live subscriber channel.
func (*Manager) Close ¶
Close releases resources held by the Manager. It closes every live Watch channel and marks the Manager as closed so subsequent public calls return ErrClosed. The NATS connection is owned by the caller and is not closed here. Safe to call more than once.
func (*Manager) Get ¶
Get reads a task record by ID. The second return is the KV revision the record was read at — callers that intend to Update pass this value into the mutate closure of Update. Returns ErrNotFound when the key is absent or carries a delete marker.
func (*Manager) KVForTest ¶
KVForTest returns the underlying JetStream KV handle so tests in sibling packages can stage CAS-conflict scenarios by writing directly to the bucket. Production code must not use this; the public API (Open, Close, Create, Get, Update, List, Watch) remains the sole supported surface. The export is intentionally verbose ("ForTest") so every call site reads as a test seam, not an accidental leak of the substrate.
func (*Manager) List ¶
List returns every task record currently readable in the bucket. Coord.Ready filters the slice client-side; this package performs no status filtering. Delete markers are skipped; malformed entries are skipped (they would indicate a corrupted write, which the watcher path would also drop).
func (*Manager) Live ¶ added in v0.14.0
func (m *Manager) Live(ctx context.Context) (<-chan EventEnvelope, error)
Live opens a live subscription to the task event log. The returned channel emits envelopes published *after* the call (DeliverNewPolicy) until ctx is canceled, the manager is closed, or the underlying consumer stops. Symmetric to Replay (one-shot drain) but for the continuous tail used by `bones tasks watch`.
Per ADR 0052 the event log is the source of truth for task state; Live consumers see the full envelope (Type, TaskID, Timestamp, Payload) on every event — no projection lookup required. Field-level changes for EventTypeUpdated land in the payload, so context-only updates surface here even though they don't mutate the KV projection shape consumers can distinguish.
Callers must drain the channel promptly; a blocked reader stalls the forwarder. Buffer is sized for typical interactive use (64). Decode errors are dropped silently — one malformed record does not poison the stream.
func (*Manager) Purge ¶
Purge permanently removes a task key from the bucket so future Get/List calls do not observe it. Returns ErrNotFound when the key is absent.
func (*Manager) Recent ¶ added in v0.13.0
Recent returns the most recent n events from the log in stream order (oldest of the slice first, newest last). Used by `bones status` for the Recent Activity surface.
func (*Manager) Replay ¶ added in v0.13.0
func (m *Manager) Replay( ctx context.Context, opts LogReadOpts, ) ([]EventEnvelope, error)
Replay reads events from the task event log according to opts and returns them in stream order. Used by `bones tasks watch`'s --from / --since backfill and by `bones status`'s recent-activity surface.
Replay is one-shot: it returns once the consumer drains; callers wanting live updates use Watch instead.
func (*Manager) Tx ¶ added in v0.13.0
Tx is the only public mutation entry point on Manager. The callback receives a *Tx and calls one or more tx.X methods to mutate the task. Each tx.X method publishes one event to the JetStream event log, then writes the corresponding KV projection. Returns ErrNoMutations if the callback completes without calling any tx.X method, or any error returned by the callback or any tx.X call.
Concurrency: multiple Tx invocations on the same task race; the KV CAS layer detects the conflict and the losing call retries inside the underlying jskv.MaxRetries-bounded loop.
func (*Manager) Watch ¶
Watch opens a subscription over all task state changes. The returned channel is closed when ctx is done, when Close is called on the Manager, or when the underlying JetStream watcher stops.
Watch emits the current bucket contents before live updates begin: each surviving key arrives as an EventCreated event. Callers that want only live changes can discard events until their first post- subscribe write round-trips. Deleted markers in the initial snapshot are skipped.
Callers must drain the channel promptly. A blocked reader stalls the watcher-forwarding goroutine and delays every other subscriber for at most one send. Buffer size is Config.ChanBuffer (default 32).
type SlotChangedPayload ¶ added in v0.13.0
SlotChangedPayload carries a slot reassignment. From may be empty for a first slot binding; To is always populated.
type Status ¶
type Status string
Status is the task lifecycle state. ADR 0005 fixes the enum to exactly these three values; invariant 13 (amended by ADR 0007) enforces the transition DAG (open→claimed, open→closed, claimed→closed, claimed→open) at write time.
type Tally ¶ added in v0.13.0
type Tally struct {
// Open is the count of tasks currently in StatusOpen.
Open int
// Claimed is the count of tasks currently in StatusClaimed.
Claimed int
// Closed is the count of tasks currently in StatusClosed.
Closed int
// Total is Open+Claimed+Closed.
Total int
}
Tally is the per-status summary that both `bones status` and `bones tasks status` render. Counts are derived by replaying events (not by walking the KV) so the two surfaces share one source. Per ADR 0052 §"TaskTally — single source for counts".
func TaskTally ¶ added in v0.13.0
func TaskTally(events []EventEnvelope) Tally
TaskTally derives a Tally by replaying events into a per-task status projection and bucket-counting at the end. The events slice should be the full log (or a subset that includes the latest state-changing event for each task ID); ordering by stream sequence is the caller's responsibility — recovery and live consumers receive ordered events by construction.
Events with unknown types are ignored. A task is counted under its most recent terminal status; a closed task that was previously reopened (not legal under the current DAG, but defensive) reports the latest status seen.
type Task ¶
type Task struct {
// ID is the TaskID. It must equal the KV key at Create time; the
// duplication is deliberate — any future migration that walks the
// bucket can use the value-side ID without parsing the key.
ID string `json:"id"`
// Title is the human-readable task summary. Bounded by the caller;
// no enforcement here beyond the MaxValueSize cap.
Title string `json:"title"`
// Status is the lifecycle state; see Status/validStatus.
Status Status `json:"status"`
// ClaimedBy is the AgentID currently holding the claim. Invariant 11
// requires non-empty iff Status == StatusClaimed; Update enforces.
ClaimedBy string `json:"claimed_by,omitempty"`
// Files is the absolute-path list of files this task touches. Sorted
// by the writer (coord.OpenTask); this package stores it verbatim.
Files []string `json:"files"`
// Parent is the optional parent TaskID — empty when this is a root
// task in a decomposition chain.
Parent string `json:"parent,omitempty"`
// Edges are outgoing typed relationships to other tasks. Invariant 25
// forbids duplicate (Type, Target) pairs; coord.Link enforces this on
// write. Additive in ADR 0014; nil in records written before that ADR.
Edges []Edge `json:"edges,omitempty"`
// Context is the caller-supplied free-form metadata. ADR 0005 caps
// the effective size against MaxValueSize in concert with the other
// fields; this package only size-checks the encoded whole.
Context map[string]string `json:"context,omitempty"`
// CreatedAt is the wall-clock time of the initial Create.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is the wall-clock time of the most recent write.
UpdatedAt time.Time `json:"updated_at"`
// DeferUntil hides the task from Ready until this UTC wall-clock
// instant. Nil means immediately eligible subject to the other
// readiness gates. Added in schema v2.
DeferUntil *time.Time `json:"defer_until,omitempty"`
// ClosedAt is the wall-clock time of transition into StatusClosed.
// Nil when Status != StatusClosed; pointer makes the zero value
// observable (and distinct from a legitimate January-1-0001 write).
ClosedAt *time.Time `json:"closed_at,omitempty"`
// ClosedBy is the AgentID that closed the task; empty if not closed.
ClosedBy string `json:"closed_by,omitempty"`
// ClosedReason is the free-form close reason; empty if not closed.
ClosedReason string `json:"closed_reason,omitempty"`
// ClaimEpoch is the monotonic counter bumped on every successful
// Claim or Reclaim. Invariant 24 requires strict increase per Claim/
// Reclaim; Commit and CloseTask fence against it to refuse zombie
// writes after a Reclaim. Zero on records that never had a claim
// (legacy records decode to zero; first Claim bumps to 1). ADR 0007.
ClaimEpoch uint64 `json:"claim_epoch,omitempty"`
// OriginalSize is the pre-compaction canonical source size for the
// latest compaction level. Zero means the task has not been compacted.
OriginalSize uint64 `json:"original_size,omitempty"`
// CompactLevel is the number of compaction passes applied to this
// task. Zero means un-compacted. Added in schema v3.
CompactLevel uint8 `json:"compact_level,omitempty"`
// CompactedAt is the wall-clock time of the latest compaction pass.
// Nil means the task has not been compacted. Added in schema v3.
CompactedAt *time.Time `json:"compacted_at,omitempty"`
// SchemaVersion stamps the schema this record was written against.
SchemaVersion int `json:"schema_version"`
// LastEventSeq is the JetStream stream sequence of the most recent
// task event whose effect has been applied to this projection. The
// hub-side recovery loop on bones up reads the live KV record and
// the stream's latest sequence per task subject; if the stream
// sequence is greater, pending events are replayed and the
// projection is brought to parity. Added in schema v4 (ADR 0052).
LastEventSeq uint64 `json:"last_event_seq,omitempty"`
}
Task is the value stored at each TaskID key in the KV bucket. The struct is persisted as JSON; every timestamp is wall-clock UTC so that two processes reading the same entry reach the same verdict regardless of local clock skew. ADR 0005 is the canonical schema.
func FilterReady ¶ added in v0.11.0
FilterReady returns the subset of records that are eligible for an agent to claim right now: open, unclaimed, no incoming open blocks/supersedes/duplicates edges, no open child task naming the record as Parent, and not deferred into the future relative to now.
The DAG check mirrors coord.Ready (see internal/coord/ready.go) but operates directly on []Task records so consumers that already have a task list (the CLI's `tasks ready` verb) do not need to open a coord session. The two implementations stay in sync by sharing the same rule list — any new readiness gate must be added in both places.
discovered-from edges are intentionally ignored — they are audit metadata, not a ready-blocker.
type Tx ¶ added in v0.13.0
type Tx struct {
// contains filtered or unexported fields
}
Tx is the carrier passed to Manager.Tx's callback. Each tx.X method is one mutation. Tx is short-lived: its scope is the duration of one Manager.Tx call. Methods are not safe to call after the callback returns.
Tx is not concurrency-safe. The callback must serialize its tx.X calls; callers needing multiple parallel writes should call Manager.Tx once per task and let the substrate serialize.
func (*Tx) Claim ¶ added in v0.13.0
Claim publishes a claimed event and applies the supplied mutator inside a CAS loop. The mutator owns the conditional rejection semantics — coord.Claim's claimMutator, coord.Reclaim's reclaimMutator, and coord.HandoffClaim's handoffMutator all plug in here unchanged.
The publish-then-CAS order means: a mutator that rejects after publish leaves a "spurious claimed" event in the log. Recovery's LastEventSeq guard prevents replay from clobbering a later projection; the audit-trail correctness is what the closed-set event types and (field, old, new) tuples guarantee for replay (see ADR 0052 §"Recovery").
func (*Tx) Close ¶ added in v0.13.0
Close publishes a closed event and updates the KV record into the terminal closed state. The mutator-shaped logic (invariant 12, invariant 24, agent identity check) lives on the caller side; tx.Close accepts a mutate callback so coord-level guards stay where they belong.
func (*Tx) Create ¶ added in v0.13.0
Create publishes a created event and writes the initial KV record. t.ID must equal tx's taskID. The event Snapshot field is left unpopulated; readers reconstruct the projection from the live KV.
func (*Tx) Link ¶ added in v0.13.0
Link publishes a linked event and updates the KV record's Edges slice. Duplicate edges are caller-prevented (coord.Link enforces invariant 25); this method does not re-check.
func (*Tx) SlotChange ¶ added in v0.13.0
SlotChange publishes a slot_changed event. The KV projection does not store slot directly (slots are an external swarm concept); the event is purely audit. From may be empty for first slot bind.
func (*Tx) Unclaim ¶ added in v0.13.0
Unclaim publishes an unclaimed event and applies the supplied mutator inside a CAS loop. The mutator owns the conditional rejection semantics (e.g. coord's "is the claim still ours" check surfacing as errClaimCASNoOp); a nil mutator falls back to the default "set status=open, clear claimed_by" projection.
reason is the free-form audit string recorded on the event but NOT stamped on the KV — operators recover it from the log.
func (*Tx) Update ¶ added in v0.13.0
Update publishes an updated event carrying one or more (field, old, new) tuples and applies the mutator to the KV record. The mutator is responsible for actually writing the new field values; the FieldChange tuples are descriptive (for the log) and must agree with the projection the mutator produces. Callers compose the mutate closure to keep the legacy invariant-checking semantics; the FieldChange tuples are caller-provided too.
type UnclaimedPayload ¶ added in v0.13.0
type UnclaimedPayload struct {
Reason string `json:"reason"`
}
UnclaimedPayload carries the un-claim reason. The previously-claiming agent is recoverable from the prior claimed event in the log; we do not duplicate it here.
type UpdatedPayload ¶ added in v0.13.0
type UpdatedPayload struct {
Changes []FieldChange `json:"changes"`
}
UpdatedPayload aggregates one or more FieldChange tuples. A Tx that changes multiple fields in one tx.Update call emits one updated event with all changes; multiple sequential tx.Update calls emit one event each.