Documentation
¶
Overview ¶
Package holds is the substrate layer that stores file-level holds in a NATS JetStream KV bucket. It exposes four primitives — Announce, Release, WhoHas, Subscribe — consumed exclusively by the coord package. See docs/adr/0007-claim-semantics.md for the composed closure/return-release model coord builds on top of these primitives.
This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it.
Index ¶
- Variables
- func EncodeForTest(h Hold) ([]byte, error)
- func KeyForTest(file wspath.Path) string
- func SetCASRetryHookForTest(fn func()) (restore func())
- type Config
- type Event
- type EventKind
- type Hold
- type Manager
- func (m *Manager) Announce(ctx context.Context, file wspath.Path, h Hold) error
- func (m *Manager) Close() error
- func (m *Manager) KVForTest() jetstream.KeyValue
- func (m *Manager) Probe(ctx context.Context) error
- func (m *Manager) Release(ctx context.Context, file wspath.Path, agent string) error
- func (m *Manager) Subscribe(ctx context.Context) (<-chan Event, error)
- func (m *Manager) WhoHas(ctx context.Context, file wspath.Path) (Hold, bool, error)
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("holds: manager is closed")
ErrClosed reports that a public method was called on a Manager whose Close has returned. Close-race with an in-flight call surfaces this error rather than a data race or nil dereference.
var ErrHeldByAnother = errors.New("holds: file held by another agent")
ErrHeldByAnother reports that Announce was called for a file already held by a different agent. Coord translates this into its own public sentinel when composing Claim.
Functions ¶
func EncodeForTest ¶
EncodeForTest exposes the package's internal hold encoder so sibling test packages can produce wire-compatible bytes to Put directly into the KV bucket. It is not part of the supported public API.
func KeyForTest ¶
KeyForTest exposes the package's file-to-KV-key transform so sibling test packages can stage KV entries under the exact key Announce uses. Not part of the supported public API.
func SetCASRetryHookForTest ¶
func SetCASRetryHookForTest(fn func()) (restore func())
SetCASRetryHookForTest installs fn as the per-retry hook used by Announce's CAS loop, returning a restore function the test must call (typically via t.Cleanup) to reinstate the no-op default. Tests use this to observe the number of CAS retries without instrumenting the KV transport. The hook fires exactly once per retry — never on the first attempt or after a final verdict.
Types ¶
type Config ¶
type Config struct {
// Bucket is the name of the JetStream KV bucket backing holds.
Bucket string
// HoldTTLMax is the bucket-wide maximum age — the hard upper bound
// on any hold, enforced by NATS itself via the stream MaxAge. It
// is the substrate's last line of defense against a hold leaked by
// a crashed agent.
HoldTTLMax time.Duration
// ChanBuffer sets the channel buffer for Subscribe. If left zero,
// Open substitutes defaultChanBuffer.
ChanBuffer int
}
Config configures Open. Every field is required; there are no silent defaults. The bucket name must be valid per JetStream's bucket-name regex ([A-Za-z0-9_-]+); violation is surfaced by jetstream.CreateOrUpdateKeyValue.
type Event ¶
Event is delivered to Subscribe callers on each observed hold change. File is the absolute path the event concerns; Kind identifies the change shape; Hold is populated only for EventAnnounced.
type EventKind ¶
type EventKind int
EventKind identifies the shape of a hold state change delivered by Subscribe.
const ( // EventAnnounced is delivered when a Put on the bucket is observed. // The accompanying Event.Hold is decoded from the entry value. EventAnnounced EventKind = iota + 1 // EventReleased is delivered when a Delete (or Purge) on the bucket // is observed. Event.Hold is the zero value on this kind. EventReleased )
EventAnnounced and EventReleased are the two kinds of hold state changes visible to Subscribe callers. Expired holds do not generate events in Phase 1; the KV watcher only observes explicit puts and deletes.
type Hold ¶
type Hold struct {
// AgentID identifies the agent that currently owns the hold. It
// must be non-empty per invariant 3.
AgentID string `json:"agent_id"`
// ClaimedAt is when the hold was most recently announced. Refreshed
// on every same-agent Announce (lease renewal).
ClaimedAt time.Time `json:"claimed_at"`
// ExpiresAt is the wall-clock moment past which WhoHas treats the
// entry as vacant. Computed as ClaimedAt + TTL at Announce time.
ExpiresAt time.Time `json:"expires_at"`
// CheckoutPath is the local checkout the agent holds the file from.
// Opaque to this package; stored verbatim.
CheckoutPath string `json:"checkout_path"`
// TTL is the original per-call lease the caller requested. Kept
// alongside ExpiresAt so a reader can distinguish a near-expiry
// hold from a long-lease hold without reconstructing arithmetic.
TTL time.Duration `json:"ttl"`
}
Hold is the value stored at each file key in the KV bucket. The struct is persisted as JSON; every timestamp is wall-clock UTC so that two processes reading the same entry reach the same expiry verdict.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns a JetStream KV bucket that stores current file holds. Every public method is safe to call concurrently. Close is idempotent.
func Open ¶
Open creates (or reattaches to) the holds KV bucket. Constructing a Manager does not consume a goroutine; Subscribe spawns one per call. Callers must invoke Close to release resources.
func (*Manager) Announce ¶
Announce places a hold on file for the agent described by h. The operation is idempotent for the same AgentID: calling Announce twice with the same agent refreshes ClaimedAt/ExpiresAt in lease-renewal style. Returns ErrHeldByAnother if a different agent already owns a non-expired hold on the same file.
The write path is CAS-atomic. Announce reads the current KV entry, decides whether to Create (vacant), Update (renew or take over an expired hold), and attempts the write with the observed revision. On revision mismatch — another agent wrote between our Get and our write — Announce retries up to jskv.MaxRetries times before returning the exhaustion error. Invariant 6 (atomic claim) is preserved: every state transition is revision-gated and losers re-evaluate against the post-conflict state.
func (*Manager) Close ¶
Close releases resources held by the Manager. It closes every live Subscribe channel and marks the Manager as closed so subsequent public calls return ErrClosed. Safe to call more than once.
func (*Manager) KVForTest ¶
KVForTest returns the underlying JetStream KV handle so tests in sibling packages can stage CAS-conflict scenarios by writing directly to the bucket. Production code must not use this; the public API (Announce, Release, WhoHas, Subscribe) remains the sole supported surface. The export is intentionally verbose ("ForTest") so every call site reads as a test seam, not an accidental leak of the substrate.
func (*Manager) Probe ¶ added in v0.8.0
Probe verifies the bucket is reachable end-to-end by issuing a Get on a sentinel key that is never written. A healthy bucket returns ErrKeyNotFound (which Probe maps to nil); any other error is returned verbatim so callers see the underlying transport failure (commonly nats.ErrNoResponders when JetStream KV is unreachable).
Used by `bones swarm join` as a preflight: the alternative is accepting the join and failing later at commit time with the same underlying error but a worse failure surface (#155).
func (*Manager) Release ¶
Release removes the hold on file if and only if it is owned by agent. The method is a no-op (nil error) when the hold is missing or held by a different agent — "releasing something you don't own" is defined away rather than errored on.
func (*Manager) Subscribe ¶
Subscribe opens a watch over all hold state changes. The returned channel is closed when ctx is done, when Close is called on the Manager, or when the underlying JetStream watcher stops.
Subscribe emits the current bucket contents as Announced events before live updates begin. Callers that want only live changes can filter on timestamps or discard events until their own Announce round-trips. Deleted markers in the initial snapshot are skipped.
Callers must drain the channel promptly. A blocked reader stalls the watcher-forwarding goroutine and delays every other subscriber for at most one send. Buffer size is Config.ChanBuffer (default 32).