indexwatch

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package indexwatch provides a robust, self-healing abstraction around the entity server's WatchIndex API.

WatchIndex streams entity changes (create/update/delete) for an attribute based index query, but the underlying watch can stop at any time (etcd errors, RPC disconnects, server restarts). Consumers that use WatchIndex directly must re-establish the watch on failure, and even when they do, any changes that occur while the watch is down are silently lost.

A Watcher closes both gaps while keeping only O(1) state — a single etcd revision cursor. It owns one goroutine that:

  1. Takes an initial snapshot via List, delivering it as a single EventSync carrying the complete current set of entities and the revision it was read at. The cursor is set to that revision.
  2. Watches from cursor+1 (etcd WithRev), forwarding live create/update/delete events and advancing the cursor from each event's revision and from idle progress watermarks.
  3. On a transient disconnect, re-watches from cursor+1 — etcd replays every put and delete since then, so nothing is missed and no re-list is needed.
  4. Only if the cursor has been compacted away does it take a fresh snapshot (another EventSync) and reset the cursor.

Because the watcher holds no per-entity state, deletes that happen during a snapshot gap (initial sync or post-compaction) are not emitted as individual EventDeleted events. Instead an EventSync hands the consumer the full current set, and the consumer reconciles it against its own state in its own business domain (replace its cache; treat any id no longer present as removed) — work these consumers already do today. Live deletes (the common case) arrive as ordinary EventDeleted events.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	// Type is the kind of change.
	Type EventType
	// Id is the entity's id. Set for Added/Updated/Deleted; empty for EventSync.
	Id entity.Id
	// Entity is the full entity for Added/Updated events. It is nil for Deleted
	// events and for EventSync.
	Entity *entity.Entity
	// Entities is the complete current set for EventSync; nil for live events.
	Entities []*entity.Entity
	// Rev is the entity's revision for Added/Updated/Deleted, and the snapshot
	// revision for EventSync.
	Rev int64
}

Event is a single change delivered on the Updates channel.

type EventType

type EventType int

EventType describes the kind of change an Event represents.

const (
	// EventSync delivers a full snapshot of the index: Entities holds the
	// complete current set and Rev is the revision it was read at. Emitted on
	// initial sync and again after a compaction or resync. Consumers reconcile
	// their own state against Entities (replace their cache; treat any id no
	// longer present as removed).
	EventSync EventType = iota
	// EventAdded indicates a live create.
	EventAdded
	// EventUpdated indicates a watched entity changed (live).
	EventUpdated
	// EventDeleted indicates an entity left the watched index (live). The Entity
	// field may be nil; rely on Id.
	EventDeleted
)

func (EventType) String

func (t EventType) String() string

String renders the EventType for logging.

type Options

type Options struct {
	// Logger receives operational logs. Defaults to slog.Default().
	Logger *slog.Logger

	// ResyncPeriod, when > 0, forces a periodic fresh snapshot even while the
	// watch is healthy, as belt-and-suspenders drift correction. Defaults to 0
	// (disabled); revision-resume makes it unnecessary in normal operation.
	ResyncPeriod time.Duration

	// MinBackoff is the initial delay between reconnect attempts. Defaults to
	// 1 second.
	MinBackoff time.Duration

	// MaxBackoff caps the exponential reconnect delay. Defaults to 5 minutes.
	MaxBackoff time.Duration

	// BufferSize is the capacity of the Updates channel. When the buffer is
	// full the watch goroutine blocks (applying backpressure to the entity
	// server) rather than dropping events, so no change is ever lost. Defaults
	// to 1000.
	BufferSize int
}

Options configures a Watcher. The zero value is usable; New applies defaults for any unset field.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher delivers gap-free change events for a single attribute based index query while holding only a revision cursor. Create one with New, then call Start. Consume events from Updates and optionally wait for the first snapshot via Synced. Call Stop to shut down.

func New

New creates a Watcher for the given index query against the entity server. It does not start watching until Start is called.

func (*Watcher) Start

func (w *Watcher) Start(ctx context.Context) error

Start launches the background watch goroutine. It is safe to call once; later calls are no-ops. The watcher runs until ctx is cancelled or Stop is called.

func (*Watcher) Stop

func (w *Watcher) Stop()

Stop cancels the watch goroutine and waits for it to exit. It is safe to call multiple times and from multiple goroutines. After Stop returns, the Updates channel is closed.

func (*Watcher) Synced

func (w *Watcher) Synced() <-chan struct{}

Synced returns a channel that is closed once the first snapshot has been delivered (the first EventSync). Consumers that build an in-memory cache from the stream can wait on it to know when their cache reflects the current state of the index.

func (*Watcher) Updates

func (w *Watcher) Updates() <-chan Event

Updates returns the channel on which change events are delivered. The channel is closed after Stop completes and the watch goroutine has exited.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL