tasks

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package tasks is the substrate layer that stores task records in a NATS JetStream KV bucket. It exposes CAS-atomic primitives — Create, Get, Update, List, Watch — consumed exclusively by the coord package. See docs/adr/0005-tasks-in-nats-kv.md for the schema, bucket, and retention rules this package enforces.

This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it.

Index

Constants

View Source
const DefaultBucketName = "bones-tasks"

DefaultBucketName is the canonical JetStream KV bucket name for task records, pinned by ADR 0005. Both the CLI's openManager helper and internal/coord must point at this same bucket; otherwise create-then- link/prime/autoclaim cross-store and silently miss the just-created task. Tests may override via Config.BucketName for isolation.

View Source
const SchemaVersion = 3

SchemaVersion is the currently-written task record schema. Every Create stamps this on the record so future migrations can fan out on observed version. v3 adds closed-task compaction metadata.

Variables

View Source
var ErrAlreadyExists = errors.New("tasks: record already exists")

ErrAlreadyExists reports that Create was called for a TaskID that already has a record in the bucket. Collisions are programmer errors under the ADR 0005 ID generator, but the sentinel exists so callers can distinguish a duplicate Create from an unrelated substrate error.

View Source
var ErrCASConflict = errors.New(
	"tasks: CAS conflict exceeded retries",
)

ErrCASConflict reports that Update exhausted jskv.MaxRetries without converging. Under normal contention this should never surface; its presence in a call site's error handling is the explicit surrender boundary for the CAS retry loop.

View Source
var ErrClosed = errors.New("tasks: manager is closed")

ErrClosed reports that a public method was called on a Manager whose Close has returned. Close-race with an in-flight call surfaces this error rather than a data race or nil dereference.

View Source
var ErrInvalidStatus = errors.New("tasks: invalid status value")

ErrInvalidStatus reports that a Task's Status field was outside the fixed {open, claimed, closed} enum. Any mutation that would write an unknown status returns this error before the CAS call.

View Source
var ErrInvalidTransition = errors.New(
	"tasks: invalid status transition",
)

ErrInvalidTransition reports that a mutation attempted a status edge outside the ADR 0005 DAG as amended by ADR 0007: legal edges are open→claimed, open→closed, claimed→closed, and claimed→open (the release-side un-claim edge). Any other backwards edge from closed or self-loop on closed is rejected.

View Source
var ErrInvariant11 = errors.New(
	"tasks: claimed_by/status mismatch (invariant 11)",
)

ErrInvariant11 reports that a mutation violated invariant 11: the claimed_by field must be non-empty iff status == claimed. Rejected before the CAS call so bucket state never observes the inconsistent pair.

View Source
var ErrNotFound = errors.New("tasks: record not found")

ErrNotFound reports that Get or Update was called for a TaskID absent from the bucket. Coord translates this into its own public sentinel when composing higher-level verbs.

View Source
var ErrValueTooLarge = errors.New(
	"tasks: encoded value exceeds max size",
)

ErrValueTooLarge reports that a Task, after JSON encoding, exceeded Config.MaxValueSize. Enforced at every write per invariant 14.

Functions

func EncodeForTest

func EncodeForTest(t Task) ([]byte, error)

EncodeForTest exposes the package's internal task encoder so sibling test packages can produce wire-compatible bytes to Put directly into the KV bucket. Not part of the supported public API.

func SetCASRetryHookForTest

func SetCASRetryHookForTest(fn func()) (restore func())

SetCASRetryHookForTest installs fn as the per-retry hook used by Update's CAS loop, returning a restore function the test must call (typically via t.Cleanup) to reinstate the no-op default. Tests use this to observe the number of CAS retries without instrumenting the KV transport. The hook fires exactly once per retry — never on the first attempt or after a final verdict.

func SetUpdatePreWriteHookForTest

func SetUpdatePreWriteHookForTest(fn func()) (restore func())

SetUpdatePreWriteHookForTest installs fn as the per-attempt hook used by Update's CAS loop between the Get and the Update calls. Tests use this seam to deterministically force CAS conflicts by performing a direct Put while fn runs — every attempt reliably fails under such a hook, making the retry-exhaustion path reachable in a single test. Restore must be called (typically via t.Cleanup) to reinstate the no-op default.

Types

type Config

type Config struct {
	// BucketName is the JetStream KV bucket backing the task records.
	// ADR 0005 pins the coord-visible name to "bones-tasks"; this
	// package takes the name as input so tests can isolate by bucket.
	BucketName string

	// HistoryDepth is the per-key JetStream KV history depth. ADR 0005
	// recommends 8; Validate-equivalent rejection happens at Open.
	HistoryDepth uint8

	// MaxValueSize is the upper bound on an encoded task record value,
	// in bytes. Enforced at every Create and Update per invariant 14.
	MaxValueSize int32

	// ChanBuffer sets the channel buffer for Watch. Zero yields the
	// package default (defaultChanBuffer).
	ChanBuffer int
}

Config configures Open. Every field is required; there are no silent defaults. ADR 0005 fixes the recommended values — HistoryDepth 8, MaxValueSize 8 KB — and coord.Config is the enforcement surface for operator input. BucketName must match the JetStream KV regex ([A-Za-z0-9_-]+); violation is surfaced by the underlying CreateOrUpdateKeyValue call.

type Edge

type Edge struct {
	Type   EdgeType `json:"type"`
	Target string   `json:"target"`
}

Edge is an outgoing directed relationship carried on the source task. Storage is outgoing-only (ADR 0014); reverse lookups require a scan.

type EdgeType

type EdgeType string

EdgeType names a typed outgoing relationship from one task to another. See ADR 0014. Unknown string values decoded from storage are preserved as-is (invariant 26) so a future phase adding a new type stays round-trip compatible with records this version writes.

const (
	EdgeBlocks         EdgeType = "blocks"
	EdgeDiscoveredFrom EdgeType = "discovered-from"
	EdgeSupersedes     EdgeType = "supersedes"
	EdgeDuplicates     EdgeType = "duplicates"
)

type Event

type Event struct {
	ID   string
	Kind EventKind
	Task Task
}

Event is delivered to Watch callers on each observed task change. ID is the TaskID the event concerns; Kind identifies the shape; Task is populated for EventCreated and EventUpdated.

type EventKind

type EventKind int

EventKind identifies the shape of a task state change delivered by Watch.

const (
	// EventCreated fires on the first Put for a key. The accompanying
	// Event.Task holds the decoded record.
	EventCreated EventKind = iota + 1

	// EventUpdated fires on subsequent Puts for an existing key. The
	// accompanying Event.Task holds the decoded post-update record.
	EventUpdated

	// EventDeleted fires on Delete or Purge. Event.Task is the zero
	// value; Event.ID carries the affected key.
	EventDeleted
)

EventCreated, EventUpdated, and EventDeleted are the three shapes of task state changes a Watch subscriber observes.

func (EventKind) String

func (k EventKind) String() string

String returns a human-readable name for the EventKind.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns a JetStream KV bucket that stores task records. Every public method is safe to call concurrently. Close is idempotent.

func Open

func Open(ctx context.Context, nc *nats.Conn, cfg Config) (*Manager, error)

Open creates (or reattaches to) the tasks KV bucket on the supplied NATS connection and returns a Manager. The caller owns nc's lifecycle; Open does not close it. Constructing a Manager does not consume a goroutine; Watch spawns one per call. Callers must invoke Close to release every live subscriber channel.

func (*Manager) Close

func (m *Manager) Close() error

Close releases resources held by the Manager. It closes every live Watch channel and marks the Manager as closed so subsequent public calls return ErrClosed. The NATS connection is owned by the caller and is not closed here. Safe to call more than once.

func (*Manager) Create

func (m *Manager) Create(ctx context.Context, t Task) error

Create writes a new task record. Uses jetstream.KeyValue.Create (revision 0), so a key that already exists rejects with ErrAlreadyExists — under the ADR 0005 ID generator a Create collision is a programmer error at the caller, but the sentinel lets the caller distinguish the mistake from an unrelated substrate failure. Invariant 11 and 13 are checked against the record's own fields before the CAS call; invariant 14 is checked after encoding.

func (*Manager) Get

func (m *Manager) Get(
	ctx context.Context, id string,
) (Task, uint64, error)

Get reads a task record by ID. The second return is the KV revision the record was read at — callers that intend to Update pass this value into the mutate closure of Update. Returns ErrNotFound when the key is absent or carries a delete marker.

func (*Manager) KVForTest

func (m *Manager) KVForTest() jetstream.KeyValue

KVForTest returns the underlying JetStream KV handle so tests in sibling packages can stage CAS-conflict scenarios by writing directly to the bucket. Production code must not use this; the public API (Open, Close, Create, Get, Update, List, Watch) remains the sole supported surface. The export is intentionally verbose ("ForTest") so every call site reads as a test seam, not an accidental leak of the substrate.

func (*Manager) List

func (m *Manager) List(ctx context.Context) ([]Task, error)

List returns every task record currently readable in the bucket. Coord.Ready filters the slice client-side; this package performs no status filtering. Delete markers are skipped; malformed entries are skipped (they would indicate a corrupted write, which the watcher path would also drop).

func (*Manager) Purge

func (m *Manager) Purge(ctx context.Context, id string) error

Purge permanently removes a task key from the bucket so future Get/List calls do not observe it. Returns ErrNotFound when the key is absent.

func (*Manager) Update

func (m *Manager) Update(
	ctx context.Context,
	id string,
	mutate func(Task) (Task, error),
) error

Update performs a revision-gated CAS update. The mutate function receives the current Task value and returns the desired new value; returning a non-nil error aborts the update and propagates the error unwrapped so callers can switch on mutate's own sentinels. On revision conflict the loop re-reads the record and re-invokes mutate, up to jskv.MaxRetries times, before surfacing ErrCASConflict. Invariants 11, 13, and 14 are checked on each attempt against the value mutate returned.

func (*Manager) Watch

func (m *Manager) Watch(
	ctx context.Context,
) (<-chan Event, error)

Watch opens a subscription over all task state changes. The returned channel is closed when ctx is done, when Close is called on the Manager, or when the underlying JetStream watcher stops.

Watch emits the current bucket contents before live updates begin: each surviving key arrives as an EventCreated event. Callers that want only live changes can discard events until their first post- subscribe write round-trips. Deleted markers in the initial snapshot are skipped.

Callers must drain the channel promptly. A blocked reader stalls the watcher-forwarding goroutine and delays every other subscriber for at most one send. Buffer size is Config.ChanBuffer (default 32).

type Status

type Status string

Status is the task lifecycle state. ADR 0005 fixes the enum to exactly these three values; invariant 13 (amended by ADR 0007) enforces the transition DAG (open→claimed, open→closed, claimed→closed, claimed→open) at write time.

const (
	StatusOpen    Status = "open"
	StatusClaimed Status = "claimed"
	StatusClosed  Status = "closed"
)

StatusOpen marks a task that has been declared but not yet claimed. StatusClaimed marks a task currently held by one agent. StatusClosed marks a terminal task; closed is a sink state.

type Task

type Task struct {
	// ID is the TaskID. It must equal the KV key at Create time; the
	// duplication is deliberate — any future migration that walks the
	// bucket can use the value-side ID without parsing the key.
	ID string `json:"id"`

	// Title is the human-readable task summary. Bounded by the caller;
	// no enforcement here beyond the MaxValueSize cap.
	Title string `json:"title"`

	// Status is the lifecycle state; see Status/validStatus.
	Status Status `json:"status"`

	// ClaimedBy is the AgentID currently holding the claim. Invariant 11
	// requires non-empty iff Status == StatusClaimed; Update enforces.
	ClaimedBy string `json:"claimed_by,omitempty"`

	// Files is the absolute-path list of files this task touches. Sorted
	// by the writer (coord.OpenTask); this package stores it verbatim.
	Files []string `json:"files"`

	// Parent is the optional parent TaskID — empty when this is a root
	// task in a decomposition chain.
	Parent string `json:"parent,omitempty"`

	// Edges are outgoing typed relationships to other tasks. Invariant 25
	// forbids duplicate (Type, Target) pairs; coord.Link enforces this on
	// write. Additive in ADR 0014; nil in records written before that ADR.
	Edges []Edge `json:"edges,omitempty"`

	// Context is the caller-supplied free-form metadata. ADR 0005 caps
	// the effective size against MaxValueSize in concert with the other
	// fields; this package only size-checks the encoded whole.
	Context map[string]string `json:"context,omitempty"`

	// CreatedAt is the wall-clock time of the initial Create.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is the wall-clock time of the most recent write.
	UpdatedAt time.Time `json:"updated_at"`

	// DeferUntil hides the task from Ready until this UTC wall-clock
	// instant. Nil means immediately eligible subject to the other
	// readiness gates. Added in schema v2.
	DeferUntil *time.Time `json:"defer_until,omitempty"`

	// ClosedAt is the wall-clock time of transition into StatusClosed.
	// Nil when Status != StatusClosed; pointer makes the zero value
	// observable (and distinct from a legitimate January-1-0001 write).
	ClosedAt *time.Time `json:"closed_at,omitempty"`

	// ClosedBy is the AgentID that closed the task; empty if not closed.
	ClosedBy string `json:"closed_by,omitempty"`

	// ClosedReason is the free-form close reason; empty if not closed.
	ClosedReason string `json:"closed_reason,omitempty"`

	// ClaimEpoch is the monotonic counter bumped on every successful
	// Claim or Reclaim. Invariant 24 requires strict increase per Claim/
	// Reclaim; Commit and CloseTask fence against it to refuse zombie
	// writes after a Reclaim. Zero on records that never had a claim
	// (legacy records decode to zero; first Claim bumps to 1). ADR 0007.
	ClaimEpoch uint64 `json:"claim_epoch,omitempty"`

	// OriginalSize is the pre-compaction canonical source size for the
	// latest compaction level. Zero means the task has not been compacted.
	OriginalSize uint64 `json:"original_size,omitempty"`

	// CompactLevel is the number of compaction passes applied to this
	// task. Zero means un-compacted. Added in schema v3.
	CompactLevel uint8 `json:"compact_level,omitempty"`

	// CompactedAt is the wall-clock time of the latest compaction pass.
	// Nil means the task has not been compacted. Added in schema v3.
	CompactedAt *time.Time `json:"compacted_at,omitempty"`

	// SchemaVersion stamps the schema this record was written against.
	SchemaVersion int `json:"schema_version"`
}

Task is the value stored at each TaskID key in the KV bucket. The struct is persisted as JSON; every timestamp is wall-clock UTC so that two processes reading the same entry reach the same verdict regardless of local clock skew. ADR 0005 is the canonical schema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL