distributedtask

package
v1.38.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: BSD-3-Clause Imports: 21 Imported by: 0

Documentation

Overview

Package distributedtask coordinates long-running operations (e.g. reindexing) across a Weaviate cluster. All task state lives in the Raft log, so it survives node restarts and leader elections without external storage.

Architecture

Three components work together:

  • Manager is the Raft state machine. It owns the canonical task state and is the only component that mutates it. All writes go through Raft Apply (see cluster/store_apply.go).

  • Scheduler runs on every node. It polls the Manager for the current task list, starts and stops local work via a Provider, and submits cleanup requests when tasks expire.

  • Provider is the extension point. Each task namespace (e.g. "reindex", "compaction") registers a Provider that knows how to execute that type of work locally.

Unit tracking

Every task declares a set of named Unit items (e.g. one per shard). Each unit progresses independently through PENDING → IN_PROGRESS → COMPLETED/FAILED. The task finishes when all units reach a terminal state. Units are always required when creating a task.

Unit assignment

Units start unassigned (empty NodeID). The Scheduler treats unassigned units as belonging to any node, so all nodes will start the task. The first node to report progress for a unit claims it — subsequent updates from other nodes are rejected. This means assignment is implicit and driven by the Provider implementation, not prescribed by the framework.

Failure semantics

When any unit fails, the entire task immediately transitions to FAILED. In-flight units on other nodes are NOT waited for — their subsequent completion reports are rejected. This fail-fast approach avoids wasting cluster resources on doomed work.

Group-level finalization

If the Provider implements UnitAwareProvider, the Scheduler fires per-group callbacks as groups complete — even while the task is still STARTED:

  1. OnGroupCompleted — fires per group when all units in that group reach a terminal state. Receives the groupID and only the local unit IDs in that group. Fires mid-flight for tasks with explicit groups, enabling per-tenant atomicity for MT reindex. When no explicit GroupID is set, all units belong to default group "" and OnGroupCompleted fires once when all units are terminal (identical to old behavior).

  2. OnTaskCompleted — fires once per node after ALL units reach terminal state. Use this for global operations (e.g. Raft schema update). Since Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.

Both callbacks fire on FINISHED and FAILED tasks so providers can finalize (success) or rollback (failure) based on task.Status. Both fire exactly once per task lifecycle.

Four journey examples

Journey 1: Spread work across any node (no finalization needed).

A data-cleanup provider distributes 1000 files as units. Any node can claim any file. No finalization needed — each unit is independent.

unitIDs := []string{"file-001", "file-002", ..., "file-1000"}
raft.AddDistributedTask(ctx, "cleanup", taskID, payload, unitIDs)

The Provider's StartTask iterates units, processes unclaimed ones, and reports completion. No UnitAwareProvider needed.

Journey 2: Per-shard work, global finalize (behavior unchanged).

A repair/rebuild provider recreates an index with the same configuration (e.g., fixing a corrupted HNSW or rebuilding blockmax segments). Because the behavior is unchanged, shards can swap independently as they finish — there's no need to wait for all shards before swapping, since queries produce the same results regardless of which format a shard is currently serving from.

Unit IDs are opaque strings. The Provider defines them at task creation time and stores any shard→unit mapping in the task payload:

payload := ReindexPayload{
    ShardMap: map[string]string{  // unitID → shardName
        "u-0": "shard-S1",       // nodeA's replica of S1
        "u-1": "shard-S1",       // nodeB's replica of S1
        "u-2": "shard-S2",       // ...
    },
}
unitIDs := []string{"u-0", "u-1", "u-2", ...}

Node assignment is automatic: the first node to report progress for a unit claims it (Unit.NodeID is set). The Provider's StartTask iterates units, checks which local shards it owns, and claims the corresponding units.

Each shard swaps its bucket pointers immediately upon completing its own reindex (inside the StartTask goroutine, before calling RecordUnitCompletion).

OnGroupCompleted: no-op — each shard already swapped during its own processing. OnTaskCompleted: optional — e.g., log completion or flip a cosmetic schema flag.

Journey 3: Per-shard work, per-shard finalize after barrier (behavior changes).

A tokenization-change provider reindexes every shard with new tokenization config (e.g., WORD → TRIGRAM). Because the behavior changes, consistency matters: if some shards serve old tokenization while others serve new, queries return mixed results. ALL shards must finish reindexing before ANY shard swaps to the new format.

Unit IDs and shard mapping work the same way as Journey 2 — the Provider defines IDs at creation time and stores the mapping in the task payload. The framework only cares about Unit.NodeID for ownership tracking.

During StartTask, each shard reindexes into new segments but does NOT swap yet. It reports progress and completion, but the old segments remain active for queries.

OnGroupCompleted: fires on each node AFTER all units across all nodes finish (since all units share the default group ""). Receives localGroupUnitIDs — which contains ONLY units assigned to THIS node (not all units in the group). The Provider looks up the shard mapping from task.Payload to know which local shards to swap. Atomically swaps bucket pointers for each local shard. This is a local operation, no Raft needed. If a node has no units in the group, OnGroupCompleted does not fire on that node.

OnTaskCompleted: submits a Raft schema update to change the tokenization config. Because Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.

The barrier guarantee ensures NO shard swaps until ALL shards finish reindexing.

Journey 4: Per-tenant work with per-tenant finalize (MT reindex with groups).

A multi-tenant reindex provider creates one group per tenant. Each tenant's replicas are units in that group. As each tenant's group completes, OnGroupCompleted fires mid-flight — the provider atomically swaps that tenant's bucket pointers without waiting for other tenants. This provides per-tenant atomicity: if tenant A's group completes while tenant B is still reindexing, tenant A starts serving new data immediately.

specs := []UnitSpec{
    {ID: "t1__nodeA", GroupID: "tenant-1"},
    {ID: "t1__nodeB", GroupID: "tenant-1"},
    {ID: "t2__nodeA", GroupID: "tenant-2"},
    ...
}
raft.AddDistributedTaskWithGroups(ctx, "reindex", taskID, payload, specs)

OnGroupCompleted: fires per-tenant as each tenant's replicas all finish. Atomically swaps bucket pointers for the local replicas of that tenant.

OnTaskCompleted: fires once when ALL tenants finish. Updates schema with new tokenization config. If any tenant failed, task.Status == FAILED — provider skips schema update but already-swapped tenants remain valid (independent).

Progress throttling

Unit progress updates go through Raft consensus. To prevent flooding the log, the Scheduler wraps the TaskCompletionRecorder in a ThrottledRecorder that forwards progress for each unit at most once per DefaultThrottleInterval (3 seconds, see the constant's godoc for the rationale). Completion and failure calls are never throttled.

Adding a new task type

To add a new kind of distributed task:

  1. Define a namespace constant (e.g. "my-reindex").
  2. Implement Provider (or UnitAwareProvider if you need group-level callbacks).
  3. Register the provider in configure_api.go's MakeAppState, keyed by your namespace.
  4. Create tasks via the Raft endpoint [cluster.Raft.AddDistributedTask], passing unit IDs (at least one unit is always required).

See ShardNoopProvider for a complete working example used by acceptance tests.

Provider idempotency contract

After a node crash, the Scheduler re-launches tasks that still have non-terminal units. The Provider MUST handle re-invocation idempotently:

  • Units in IN_PROGRESS state will be re-delivered to the same node that claimed them. The provider must detect partially-completed work (e.g. via sentinel files) and either resume or restart the unit safely.

  • Units in PENDING state (unclaimed) may be delivered to any node. The provider must tolerate being asked to process a unit that another node is also attempting to claim — only one node's first progress update will succeed. Providers that use per-replica assignment (UnitToNode metadata) avoid this race entirely, since each unit is deterministically assigned to exactly one node at creation time.

  • The framework does NOT re-assign units claimed by a crashed node to other nodes. The crashed node must eventually restart for its IN_PROGRESS units to complete. If a node is permanently lost, the task must be cancelled manually.

Typical idempotency patterns:

  • Check for a completion sentinel file before starting work
  • Use atomic file operations (write-to-temp + rename) for crash safety
  • Store progress checkpoints that allow resuming from the last known good state

Index

Constants

View Source
const DefaultThrottleInterval = 3 * time.Second

DefaultThrottleInterval is the production cap on per-unit progress writes to the RAFT log. Scheduler.Start passes this constant to NewThrottledRecorder. Pinned by `TestThrottledRecorder_DefaultInterval_*` — if you change the value, update Scheduler.Start's rationale comment and the matching prose in `doc.go` ("Progress throttling" section) too.

View Source
const PermanentRejectionRPCCode = codes.FailedPrecondition

PermanentRejectionRPCCode is the gRPC status code used to discriminate permanent FSM rejections from generic Internal errors on the wire. It is intentionally codes.FailedPrecondition: the FSM is internally consistent but the current request cannot be satisfied.

View Source
const ShardNoopProviderNamespace = "shard-noop"

Variables

View Source
var (
	// ErrPermanentRejection is the umbrella sentinel matched by every
	// permanent FSM rejection. Classifiers should errors.Is against this.
	ErrPermanentRejection = errors.New("permanent FSM rejection")

	// ErrTaskNotRunning matches "task ... is no longer running".
	ErrTaskNotRunning = errors.New("task is no longer running")

	// ErrTaskDoesNotExist matches "task ... does not exist".
	ErrTaskDoesNotExist = errors.New("task does not exist")

	// ErrUnitAlreadyTerminal matches "unit ... is already terminal".
	ErrUnitAlreadyTerminal = errors.New("unit is already terminal")

	// ErrUnitWrongNode matches "unit ... belongs to node X, not Y".
	ErrUnitWrongNode = errors.New("unit belongs to a different node")

	// ErrTaskNotInFinalizingState matches "task ... cannot be finalized
	// from status ...". Returned by [Manager.MarkTaskFinalized] when a
	// stale RAFT command for a task that has already been cancelled or
	// failed arrives — refusing here avoids overwriting a terminal
	// status the operator (or fail-fast path) committed in the meantime.
	ErrTaskNotInFinalizingState = errors.New("task is not in finalizing state")
)

Sentinel errors describing stable, non-retryable FSM rejections from Manager.RecordUnitCompletion, Manager.UpdateUnitProgress and similar apply paths.

Classifiers (e.g. reindex_provider.isPermanentRecorderRejection) should use errors.Is against ErrPermanentRejection to decide whether to retry. The specific sentinels (ErrTaskNotRunning, ErrTaskDoesNotExist, ...) are kept so callers that want to log a precise reason can still test for them individually.

## Wire-format / mixed-version note

In a cluster, the FSM error originates on the leader and reaches the caller via gRPC (cluster/rpc). gRPC transport collapses Go errors into (status.Code, message string) — wrapping with %w on the receiving side would lose the sentinel chain.

To survive the round-trip we encode the sentinel identity into the gRPC error in two ways (both additive — no proto change required):

  1. A stable PermanentRejectionRPCCode = codes.FailedPrecondition on the google.golang.org/grpc/status.Status. This is the primary discriminator.
  2. A stable, machine-readable prefix marker on the error message, e.g. "dtm-perm/task-not-running ...". The marker is the per-sentinel fidelity carrier and survives any later %w wrapping because it lives in the string. The marker is NOT user-facing; the human portion of the message follows after the closing bracket.

On the receiving side, RehydratePermanentRejection inspects an error from a gRPC apply call and re-attaches the appropriate sentinel via errors.Join so errors.Is keeps working end-to-end.

Cross-version compatibility:

  • Old leader → new follower: the leader returns the legacy fmt.Errorf-formatted message without the marker and without codes.FailedPrecondition. The follower's classifier falls back to substring matching of the legacy phrases (with a Warn log so we notice when the fallback fires in prod).
  • New leader → old follower: the old follower still substring-matches the legacy phrases, which we keep intact inside the new prefixed messages. Both new and old phrasings coexist in the same string.

Functions

func RehydratePermanentRejection added in v1.38.0

func RehydratePermanentRejection(err error) error

RehydratePermanentRejection inspects an error from a gRPC apply call and, if it carries the permanent-rejection signal, returns a new error that satisfies errors.Is for the appropriate sentinel(s).

Detection order:

  1. gRPC status code == PermanentRejectionRPCCode and the message prefix matches "[dtm-perm/<id>] ..." → re-attach the specific sentinel via errors.Join.
  2. gRPC status code == PermanentRejectionRPCCode but the marker is missing or unknown → re-attach only the umbrella sentinel (forward-compat: a future sentinel id we don't recognise should still be classified as permanent).

If the error is not a recognised permanent-rejection signal, the input is returned unchanged.

func ToRPCError added in v1.38.0

func ToRPCError(err error) error

ToRPCError converts an FSM error into a gRPC error that preserves the permanent-rejection classification across the wire. Generic (non- permanent) errors are returned as codes.Internal, matching the prior behavior in cluster/rpc.

Callers in cluster/rpc/server.go should prefer this helper for any FSM path that may return permanent sentinels.

Types

type CollectionExtractor added in v1.38.0

type CollectionExtractor func(payload []byte) (collection string, ok bool)

CollectionExtractor returns the schema-collection a task's payload is bound to ("", false for non-scoped / unparseable). Register via Manager.RegisterCollectionExtractor to opt a namespace into Manager.DeleteTasksForCollection — closes weaviate/0-weaviate-issues#231.

type ConcurrencyLimiter added in v1.37.0

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter is a channel-based semaphore that limits the number of concurrent operations. Acquire blocks until a slot is available or the context is cancelled. Release returns a slot to the pool.

func NewConcurrencyLimiter added in v1.37.0

func NewConcurrencyLimiter(maxConcurrency int) *ConcurrencyLimiter

NewConcurrencyLimiter creates a limiter that allows up to maxConcurrency concurrent operations. Values < 1 are clamped to 1.

func (*ConcurrencyLimiter) Acquire added in v1.37.0

func (l *ConcurrencyLimiter) Acquire(ctx context.Context) error

Acquire blocks until a slot is available or ctx is cancelled.

func (*ConcurrencyLimiter) Release added in v1.37.0

func (l *ConcurrencyLimiter) Release()

Release returns a slot to the pool. Must be called exactly once for each successful Acquire.

type ConflictDetector added in v1.38.0

type ConflictDetector interface {
	Provider

	// CheckConflict is called under [Manager.mu] before a new task is
	// appended to the FSM-stored task list. existingTasks is the full
	// namespace-scoped task list at apply time. Return a non-nil error
	// to reject the new task; the error propagates back to the
	// AddDistributedTask caller.
	CheckConflict(newPayload []byte, existingTasks []*Task) error
}

ConflictDetector is an optional interface providers implement so the Manager.AddTask RAFT-apply path can reject a new task whose payload conflicts with an already-running task in the same namespace.

Motivation: the REST handler holds a per-(collection, property) in-memory lock and runs [checkReindexConflict] before submitting, which closes the same-node race. But two parallel PUT /indexes/{prop} requests served by *different* nodes both pass the per-node lock + check (neither has called AddDistributedTask yet at the moment they each query the cluster task list) and both submit a RAFT task. At that point two reindex migrations race on shared on-disk state for the property and one of them ends up FAILED — the multi-node face of https://github.com/weaviate/weaviate/issues/10675 (issue tracked as "parallel-migration bug #54").

Putting the conflict check inside Manager.AddTask under m.mu makes it RAFT-deterministic: every node consults the same FSM-stored task list at apply time and rejects the duplicate identically, returning the conflict error to the originating client.

FSM-determinism contract: CheckConflict MUST be a pure function of (newPayload, existingTasks). It must not read mutable process state (clocks, network, schema, RNG) — different nodes applying the same log entry must reach the same accept/reject decision.

type ListDistributedTasksResponse

type ListDistributedTasksResponse struct {
	Tasks map[string][]*Task `json:"tasks"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is responsible for managing distributed tasks across the cluster.

func NewManager

func NewManager(params ManagerParameters) *Manager

func (*Manager) AddTask

func (m *Manager) AddTask(c *api.ApplyRequest, seqNum uint64) error

AddTask registers a new distributed task from a Raft apply. The seqNum becomes the task's Version, used to distinguish re-runs of the same task ID. Returns an error if a task with the same namespace/ID is already running, or if no units are provided.

func (*Manager) CancelTask

func (m *Manager) CancelTask(a *api.ApplyRequest) error

CancelTask transitions a running task to CANCELLED. In-flight units are not waited for — the Scheduler will terminate their local handles on the next tick.

func (*Manager) CheckClassMutation added in v1.38.0

func (m *Manager) CheckClassMutation(className string) error

CheckClassMutation consults every registered SchemaMutationDetector for class-wide destructive mutations (e.g. DeleteClass). Stricter than CheckPropertyUpdate — any in-flight reindex on the class blocks the mutation.

Same RAFT-determinism contract as CheckPropertyUpdate.

func (*Manager) CheckPropertyUpdate added in v1.38.0

func (m *Manager) CheckPropertyUpdate(className, propertyName string) error

CheckPropertyUpdate consults every registered SchemaMutationDetector against the current FSM-stored task list and returns the first conflict reported. Called by the schema FSM's UpdateProperty apply path BEFORE the merge is applied; returning a non-nil error causes the apply to reject with that error.

RAFT-deterministic by construction: under m.mu (write lock to match the apply paths that mutate m.tasks), every node sees the same task list at the same applyIndex, and each detector is contractually a pure function of its arguments. So every node reaches the same accept/reject decision.

Returns nil when no detectors are registered or no task in any namespace flags the update. Empty fast-path keeps the schema apply path free of allocations in the common case.

func (*Manager) CheckTenantMutation added in v1.38.0

func (m *Manager) CheckTenantMutation(className string, tenants []string) error

CheckTenantMutation consults every registered SchemaMutationDetector for tenant-level mutations that would make the named tenants' shards locally unavailable (DeleteTenants, UpdateTenants toward OFFLOADED / FROZEN / transitional).

Same RAFT-determinism contract as CheckPropertyUpdate.

func (*Manager) CleanUpTask

func (m *Manager) CleanUpTask(a *api.ApplyRequest) error

CleanUpTask removes a terminal task from the Manager's state. It refuses to clean up tasks that are still running or whose completedTaskTTL has not yet elapsed, preventing premature removal of status information that other nodes may still need to observe.

func (*Manager) DeleteTasksForCollection added in v1.38.0

func (m *Manager) DeleteTasksForCollection(collection string) []TaskDescriptor

DeleteTasksForCollection drops tasks whose payload binds to `collection`. Called from the schema FSM on DELETE_CLASS so a drop+recreate of the same class name starts with a clean task slate. Empty `collection` is rejected (an extractor emitting ("", true) on stray bytes would otherwise wipe the cluster). See weaviate/0-weaviate-issues#231.

func (*Manager) ListDistributedTasks

func (m *Manager) ListDistributedTasks(_ context.Context) (map[string][]*Task, error)

ListDistributedTasks returns a snapshot of all tasks grouped by namespace. Each Task is cloned, so callers may read the returned values without holding the Manager's lock.

Tasks within each namespace are sorted deterministically so adjacent polls return the same slice order regardless of Go's randomized map iteration. Sort key:

  1. STARTED tasks first (the currently-running work matters most).
  2. Within priority, by activity-time DESC (newest first). Activity-time is FinishedAt for terminal tasks, StartedAt otherwise.
  3. Tiebreak by ID ASC for full stability.

func (*Manager) ListDistributedTasksPayload

func (m *Manager) ListDistributedTasksPayload(ctx context.Context) ([]byte, error)

func (*Manager) MarkTaskFinalized added in v1.38.0

func (m *Manager) MarkTaskFinalized(c *api.ApplyRequest) error

MarkTaskFinalized transitions a task from SWAPPING to FINISHED. It is issued by the scheduler once OnGroupCompleted (per-node swap) and OnTaskCompleted (cluster-wide schema flip for semantic migrations) have both succeeded.

Idempotent at the FSM layer: every node's scheduler fires this command after its local callbacks succeed. The first commit flips the status; subsequent commits hit the "already FINISHED" short-circuit and return without error.

func (*Manager) RecordPostCompletionAck added in v1.38.0

func (m *Manager) RecordPostCompletionAck(c *api.ApplyRequest) error

RecordPostCompletionAck records one node's SWAP-phase ack on the task. Gates SWAPPING → FINISHED on every expected ack landing successfully; any Success=false flips to FAILED, which skips the cluster-wide schema flip in OnTaskCompleted. Idempotent: first ack per (task, node) wins; late acks against terminal states are silently dropped.

func (*Manager) RecordPreparationCompleteAck added in v1.38.0

func (m *Manager) RecordPreparationCompleteAck(c *api.ApplyRequest) error

RecordPreparationCompleteAck records one node's PREP-phase ack on the task. Gates PREPARING → SWAPPING on every expected ack landing successfully; any Success=false flips the task to FAILED, holding the barrier so no node proceeds to the atomic swap. Idempotent: first ack per (task, node) wins; late acks against terminal states are silently dropped.

Specifically:

  • Ack arrives for an idempotently-already-acked (task, node): no-op, the first ack wins.
  • Ack arrives for a task no longer in a state that can use it (FAILED / FINISHED / CANCELLED, or SWAPPING/FINISHED after the barrier has already lifted): no-op.
  • Ack with Success==false arrives while the task is PREPARING: records the ack AND transitions the task to FAILED.
  • Ack with Success==true arrives while the task is STARTED or PREPARING: records the ack. If every expected node (i.e. every node that owns at least one local unit on this task) has now ack'd with Success=true, transitions the task PREPARING → SWAPPING. The scheduler tick on each node observes SWAPPING and fires the per-node atomic swap (OnSwapRequested).

Idempotent: every node's scheduler may re-fire this on tick / wake retries until the apply commits. The first ack per (task, node) sticks; later acks for the same node are silently discarded.

FSM-determinism: the PREPARING → SWAPPING transition is computed purely from the task's Units → NodeID map (which is RAFT-replicated and identical on every node) plus the PreparationCompletionAcks state — so every node's Manager arrives at the transition on the same apply.

func (*Manager) RecordUnitCompletion added in v1.37.0

func (m *Manager) RecordUnitCompletion(c *api.ApplyRequest) error

RecordUnitCompletion handles both success and failure (distinguished by a non-empty error field in the request). On failure, the task transitions to FAILED immediately — remaining in-flight units are NOT waited for, and their subsequent completion reports will be rejected with "task is no longer running". This fail-fast behavior is intentional: it avoids wasting cluster resources on a task that is already doomed.

func (*Manager) RegisterCollectionExtractor added in v1.38.0

func (m *Manager) RegisterCollectionExtractor(namespace string, extractor CollectionExtractor)

RegisterCollectionExtractor opts a task namespace into DeleteTasksForCollection's cascade. Extractor runs under the Manager lock — must not block or recurse. Last write wins per namespace; nil / empty arguments are silently dropped.

func (*Manager) Restore

func (m *Manager) Restore(bytes []byte) error

Restore replaces the Manager's in-memory state from a Raft snapshot produced by Manager.Snapshot. It is called during Raft leader election or when a follower installs a snapshot from the leader.

func (*Manager) SetConflictDetectors added in v1.38.0

func (m *Manager) SetConflictDetectors(detectors map[string]ConflictDetector)

SetConflictDetectors installs the per-namespace conflict-detection hook called by Manager.AddTask. Safe to call once at startup after both the Manager and the providers exist (see configure_api.go wiring). Subsequent calls overwrite the previous registration.

Pass nil to disable conflict checking (e.g. unit tests that exercise AddTask in isolation).

func (*Manager) SetSchedulerNotifier added in v1.38.0

func (m *Manager) SetSchedulerNotifier(notifier SchedulerNotifier)

SetSchedulerNotifier installs the scheduler wake-up notifier. Safe to call once at startup after both the Manager and the Scheduler exist (see configure_api.go wiring). Subsequent calls overwrite the previous notifier.

notifier may be nil to disable reactive firing (e.g. in unit tests that exercise the periodic tick path in isolation).

func (*Manager) SetSchemaMutationDetectors added in v1.38.0

func (m *Manager) SetSchemaMutationDetectors(detectors map[string]SchemaMutationDetector)

SetSchemaMutationDetectors installs the per-namespace registry consulted by Manager.CheckPropertyUpdate from the schema FSM's UpdateProperty apply path. Safe to call once at startup after both the Manager and the providers exist (configure_api.go wiring). Subsequent calls overwrite the previous registration.

Pass nil to disable the schema-mutation guard (e.g. unit tests that exercise schema applies in isolation).

func (*Manager) Snapshot

func (m *Manager) Snapshot() ([]byte, error)

Snapshot serialises the full task state to JSON for Raft snapshotting. The inverse operation is Manager.Restore.

func (*Manager) UpdateUnitProgress added in v1.37.0

func (m *Manager) UpdateUnitProgress(c *api.ApplyRequest) error

UpdateUnitProgress also handles initial node assignment: the first progress update for an unassigned unit sets its NodeID, claiming it for that node. After assignment, updates from other nodes are rejected. Progress updates to terminal units are silently ignored (no error) because in-flight Raft commands may arrive after a unit has already completed.

Stored Progress is monotonic per task version; only NodeID and UpdatedAt are applied when the requested Progress regresses. Receiver-side defence against sender-side miscomputation. See weaviate/0-weaviate-issues#232.

type ManagerParameters

type ManagerParameters struct {
	Clock clockwork.Clock

	CompletedTaskTTL time.Duration

	Logger logrus.FieldLogger
}

type MockTaskCleaner

type MockTaskCleaner struct {
	mock.Mock
}

MockTaskCleaner is an autogenerated mock type for the TaskCleaner type

func NewMockTaskCleaner

func NewMockTaskCleaner(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCleaner

NewMockTaskCleaner creates a new instance of MockTaskCleaner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCleaner) CleanUpDistributedTask

func (_m *MockTaskCleaner) CleanUpDistributedTask(ctx context.Context, namespace string, taskID string, taskVersion uint64) error

CleanUpDistributedTask provides a mock function with given fields: ctx, namespace, taskID, taskVersion

func (*MockTaskCleaner) EXPECT

type MockTaskCleaner_CleanUpDistributedTask_Call

type MockTaskCleaner_CleanUpDistributedTask_Call struct {
	*mock.Call
}

MockTaskCleaner_CleanUpDistributedTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanUpDistributedTask'

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Return

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Run

func (*MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn

type MockTaskCleaner_Expecter

type MockTaskCleaner_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCleaner_Expecter) CleanUpDistributedTask

func (_e *MockTaskCleaner_Expecter) CleanUpDistributedTask(ctx interface{}, namespace interface{}, taskID interface{}, taskVersion interface{}) *MockTaskCleaner_CleanUpDistributedTask_Call

CleanUpDistributedTask is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • taskVersion uint64

type MockTaskCompletionRecorder

type MockTaskCompletionRecorder struct {
	mock.Mock
}

MockTaskCompletionRecorder is an autogenerated mock type for the TaskCompletionRecorder type

func NewMockTaskCompletionRecorder

func NewMockTaskCompletionRecorder(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCompletionRecorder

NewMockTaskCompletionRecorder creates a new instance of MockTaskCompletionRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCompletionRecorder) EXPECT

func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion added in v1.37.0

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string) error

RecordDistributedTaskUnitCompletion provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID

func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure added in v1.37.0

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, errMsg string) error

RecordDistributedTaskUnitFailure provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, errMsg

func (*MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress added in v1.37.0

func (_m *MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, progress float32) error

UpdateDistributedTaskUnitProgress provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, progress

type MockTaskCompletionRecorder_Expecter

type MockTaskCompletionRecorder_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call

RecordDistributedTaskUnitCompletion is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, errMsg interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call

RecordDistributedTaskUnitFailure is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string
  • errMsg string

func (*MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, progress interface{}) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call

UpdateDistributedTaskUnitProgress is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string
  • progress float32

type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitCompletion'

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) RunAndReturn added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitFailure'

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) RunAndReturn added in v1.37.0

type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call added in v1.37.0

type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateDistributedTaskUnitProgress'

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) RunAndReturn added in v1.37.0

type PostCompletionAck added in v1.38.0

type PostCompletionAck struct {
	// Success is true iff the node's OnGroupCompleted (i.e. the per-shard
	// runtime swap / finalize) returned no error for every local unit.
	Success bool `json:"success"`
	// Error captures the aggregated error message when Success==false.
	// Empty when Success==true.
	Error string `json:"error,omitempty"`
	// AckedAt is the wall-clock time the ack was applied on the FSM
	// (set on the apply path, not from the scheduler). Useful for
	// forensics — the gap between AllUnitsTerminal's FinishedAt and the
	// last AckedAt is the SWAPPING window's wall-clock duration on this
	// cluster.
	AckedAt time.Time `json:"ackedAt"`
}

PostCompletionAck records one node's OnGroupCompleted result for a task. Persisted on the Task under the per-node-ID key of [Task.PostCompletionAcks] and survives RAFT snapshot/restore so the cluster-wide ack barrier is durable across restarts.

type PostCompletionAckRecorder added in v1.38.0

type PostCompletionAckRecorder interface {
	RecordDistributedTaskPostCompletionAck(
		ctx context.Context,
		namespace, taskID string,
		taskVersion uint64,
		nodeID string,
		success bool,
		errMsg string,
	) error

	// RecordDistributedTaskPreparationCompleteAck commits one node's PREP result via
	// RAFT and gates the cluster-wide PREPARING → SWAPPING transition. Any
	// success=false flips the task to FAILED so no node fires its swap.
	// Idempotent: first ack per (task, node) wins.
	RecordDistributedTaskPreparationCompleteAck(
		ctx context.Context,
		namespace, taskID string,
		taskVersion uint64,
		nodeID string,
		success bool,
		errMsg string,
	) error
}

PostCompletionAckRecorder is the RAFT-apply hook the Scheduler uses to publish one node's OnGroupCompleted result (success or failure) after its callbacks have returned for every local group in a task. The scheduler gates [TaskFinalizer.MarkDistributedTaskFinalized] on having an ack from every node that has local units in the task, and transitions the task to FAILED if any ack reports failure — which makes [UnitAwareProvider.OnTaskCompleted] skip the cluster-wide schema flip on that path.

The recorded state is stored on the Task (see [Task.PostCompletionAcks]) and survives RAFT snapshot/restore so a node restart during the SWAPPING window does not lose the cluster's collected acks.

Crash-safety contract: without this hook, a node whose RunSwapOnShard silently failed could still let the cluster-wide schema flip commit, leaving that replica serving wrong-tokenization data with no operator signal. The recorder closes that gap by gating MarkTaskFinalized on every per-node ack landing.

type Provider

type Provider interface {
	// SetCompletionRecorder is invoked on node startup to register TaskCompletionRecorder which
	// should be passed to all launch tasks so they could mark their completion.
	SetCompletionRecorder(recorder TaskCompletionRecorder)

	// GetLocalTasks returns a list of tasks that provider is aware of from the local node state.
	GetLocalTasks() []TaskDescriptor

	// CleanupTask is a signal to clean up the task local state.
	CleanupTask(desc TaskDescriptor) error

	// StartTask is a signal to start executing the task in the background.
	StartTask(task *Task) (TaskHandle, error)
}

Provider is an interface for the management and execution of a group of tasks denoted by a namespace.

type RecoveryAwareProvider added in v1.38.0

type RecoveryAwareProvider interface {
	Provider

	// LocalCallbacksDone returns true iff this provider has verified,
	// from durable local state, that OnGroupCompleted (and any
	// follow-up local recovery) has completed successfully for every
	// unit assigned to localNode. Returning false means "the
	// bootstrap pre-mark should NOT suppress callback replay for
	// this task — let OnGroupCompleted re-fire on next tick so the
	// provider can finish recovery."
	//
	// Called from [Scheduler.preMarkTerminalCallbacksLocked] under
	// s.mu, ONCE per terminal task at bootstrap. Implementations
	// should treat this as a cheap on-disk check.
	LocalCallbacksDone(task *Task, localNode string) bool
}

RecoveryAwareProvider is an optional interface providers implement to participate in post-restart callback retry. The Scheduler's bootstrap pre-mark (which normally suppresses replay of callbacks that fired pre-restart) calls into this hook for every terminal task; if the provider reports the local-side callback as NOT yet durably complete, the scheduler skips the pre-mark for that task so the next tick re-fires OnGroupCompleted and the provider's recovery path can finish the half-applied work.

Motivating scenario (RollingRestartMidMigration): a node's OnGroupCompleted started running, completed swap for 2 of 3 local shards, then context-cancelled mid-shutdown of the 3rd shard's reindex bucket because the rolling restart began. The task is FINISHED in RAFT (the unit-completion was recorded before OnGroupCompleted fired), so without this hook the bootstrap pre-mark silently suppresses the retry and the 3rd shard stays at the old tokenization forever — per-replica divergence (#10675 family).

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the component which is responsible for polling the active tasks in the cluster (via the Manager) and making sure that the tasks are running on the local node.

Backup precheck scope (coordinator-local fast-fail)

The scheduler keeps the local node's view of which tasks are in flight; consumers like the backup coordinator's Backupable precheck (adapters/repos/db/backup.go) consult that view to refuse a backup whose target class has a runtime-reindex still in flight.

That precheck is intentionally coordinator-local: it only walks shards on the node that received the backup request. Remote refusals are NOT escalated through the scheduler; they happen later in the backup state machine on every replica's canCommit step. The asymmetry is by design — coordinator-local hit is a fast-fail optimization (most backup attempts will hit a remote shard's canCommit anyway, so paying a cross-node RAFT round-trip in the precheck path is wasted latency), and any reindex that exists on a remote shard but not locally still gets caught by the global canCommit barrier before the backup writes anything durable.

If a future change pushes the precheck down into a cluster-wide query, this paragraph and the matching comment in db/backup.go:Backupable should be updated together.

The general flow of a distributed task is as follows: 1. A Provider is registered with the Scheduler at startup to handle all tasks under a specific namespace. 2. A task is created and added to the cluster via the Manager.AddTask. 3. Scheduler regularly scans all available tasks in the cluster, picks up new ones and instructs the Provider to execute them locally. 4. A task is responsible for updating its status in the cluster via TaskCompletionRecorder. 5. Scheduler polls the cluster for the task status and checks if it is still running. It cancels the local task if it is not marked as STARTED anymore. 6. After completed task TTL has passed, the Scheduler issues the Manager.CleanUpDistributedTask request to remove the task from the cluster list. 7. After a task is removed from the cluster list, the Scheduler instructs the Provider to clean up the local task state.

func NewScheduler

func NewScheduler(params SchedulerParams) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close()

Close stops the background tick loop and terminates all running task handles. It blocks until all handles have been signalled. After Close returns, no new ticks will fire.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start wires up providers with a ThrottledRecorder, performs an initial task listing to bootstrap any already-active tasks, and spawns the background tick loop. It is safe to call exactly once. Use Scheduler.Close to stop the loop and terminate all running tasks.

func (*Scheduler) Wake added in v1.38.0

func (s *Scheduler) Wake()

Wake requests an immediate scheduling cycle. Non-blocking: a pending wake-up coalesces additional calls (the next loop iteration sees the latest cluster-wide task state regardless of how many wakes accumulated). Safe to call from any goroutine, including RAFT-apply paths.

Wake is a no-op after Scheduler.Close returns — the run loop has already exited and won't observe the signal.

type SchedulerNotifier added in v1.38.0

type SchedulerNotifier interface {
	Wake()
}

SchedulerNotifier is implemented by the Scheduler and consumed by the Manager to request an immediate scheduling cycle after a task-state change applies via Raft. Without this hook the scheduler only reacts on its periodic tick (default 1 minute, see DefaultDistributedTasksSchedulerTickInterval), so a barrier opening from the last unit's terminal transition is staggered across nodes by up to one tick interval. Wake() is non-blocking and may be called from performance-sensitive RAFT-apply paths; implementations must coalesce rapid-fire calls and never block the caller.

type SchedulerParams

type SchedulerParams struct {
	CompletionRecorder TaskCompletionRecorder
	TaskLister         TaskLister
	TaskCleaner        TaskCleaner
	TaskFinalizer      TaskFinalizer
	// AckRecorder publishes per-node phase results via RAFT. nil in unit
	// tests; production wiring in configure_api.go always sets this.
	AckRecorder       PostCompletionAckRecorder
	Providers         map[string]Provider
	Clock             clockwork.Clock
	Logger            logrus.FieldLogger
	MetricsRegisterer prometheus.Registerer

	LocalNode        string
	CompletedTaskTTL time.Duration
	TickInterval     time.Duration
}

type SchemaMutationDetector added in v1.38.0

type SchemaMutationDetector interface {
	Provider

	// CheckPropertyUpdate is called under [Manager.mu] from the
	// schema FSM's UpdateProperty apply path. existingTasks is the
	// full namespace-scoped task list at apply time. Return a
	// non-nil error to reject the property update; the error
	// propagates back to the UpdateProperty caller.
	CheckPropertyUpdate(className, propertyName string, existingTasks []*Task) error

	// CheckClassMutation is called under [Manager.mu] from the schema FSM's
	// destructive class-wide apply paths (DeleteClass etc.). Stricter than
	// CheckPropertyUpdate: any in-flight reindex on the class is a conflict,
	// because dropping the class wipes every in-flight migration's working dirs.
	CheckClassMutation(className string, existingTasks []*Task) error

	// CheckTenantMutation is called under [Manager.mu] from the
	// schema FSM's DeleteTenants / UpdateTenants apply paths when a
	// transition would make one or more tenants' shards locally
	// unavailable (DeleteTenants, or UpdateTenants toward OFFLOADED
	// / FROZEN / OFFLOADING / FREEZING). existingTasks is the full
	// namespace-scoped task list at apply time. Return a non-nil
	// error to reject the mutation.
	//
	// Today's reindex tasks are class-scoped (their payload names a
	// collection but not a specific tenant — they apply to whatever
	// shards exist), so the conservative implementation is "block
	// every tenant mutation on a class with any in-flight reindex".
	// A future per-tenant reindex payload could narrow this.
	CheckTenantMutation(className string, tenants []string, existingTasks []*Task) error
}

SchemaMutationDetector is an optional interface providers implement so the schema FSM's UpdateProperty apply path can reject external schema mutations that would race with one of the provider's in-flight tasks.

Motivating failure mode: a `change-tokenization` reindex spawns separate per-shard sub-tasks for the searchable and filterable indexes. A DELETE `/index/searchable` arriving mid-flight applies `cleanStaleMigrationDirs("<prop>", "searchable")`, which wipes the `searchable_retokenize_<prop>_<gen>/` working dir under the still- running sub-task. That sub-unit FAILs; the sibling filterable sub-unit keeps going and commits its local bucket swap; the per-shard ack barrier sees mixed acks → task FAILED → `flipSemanticMigrationSchema` skipped → schema stays at OLD tokenization while the filterable bucket on disk now holds NEW-tokenized data. Bucket↔schema inversion — same family as the ack-barrier failure mode but triggered by an external schema mutation instead of a crash.

Putting the check inside the schema FSM's UpdateProperty apply makes it RAFT-deterministic: every node sees the same distributed-task FSM snapshot at the same applyIndex and reaches the same accept/reject decision. Symmetric to ConflictDetector (which protects in-flight tasks from new conflicting tasks); this protects them from out-of-band schema mutations during their flight.

FSM-determinism contract: CheckPropertyUpdate MUST be a pure function of (className, propertyName, existingTasks). It must not read mutable process state (clocks, network, schema, RNG) — different nodes applying the same log entry must reach the same accept/reject decision.

type ShardLister added in v1.37.0

type ShardLister interface {
	GetLocalShardNames(collection string) ([]string, error)
}

ShardLister provides local shard names for a collection, allowing the ShardNoopProvider to determine unit ownership based on real shard topology without importing the db package.

type ShardNoopProvider added in v1.37.0

type ShardNoopProvider struct {
	// contains filtered or unexported fields
}

ShardNoopProvider is a test-only UnitAwareProvider used by acceptance tests to exercise the unit lifecycle end-to-end. It is registered in configure_api.go behind the SHARD_NOOP_PROVIDER_ENABLED env var and exposed via a debug HTTP endpoint on port 6060.

On StartTask, it spawns a goroutine that iterates units sequentially, reports 50% progress, then completes each one. Set FailUnitID in the payload to make one unit fail instead, which triggers the task-level fail-fast behavior.

When a Collection is specified in the payload and a ShardLister is provided, the provider only claims units whose IDs match local shard names. This validates that unit ownership aligns with actual shard placement.

Marker files are written as hidden dot-files inside shard directories (when a collection is specified) or under {dataRoot}/.dtm/ (for synthetic units). This avoids writing to /tmp and keeps side effects scoped to the Weaviate data directory.

func NewShardNoopProvider added in v1.37.0

func NewShardNoopProvider(nodeID string, logger logrus.FieldLogger, shardLister ShardLister, dataRoot string) *ShardNoopProvider

NewShardNoopProvider creates a new ShardNoopProvider. Pass nil for shardLister when real shard topology is not needed (e.g. unit tests with synthetic unit IDs). dataRoot is the Weaviate persistence data path; marker files are written as hidden dot-files inside shard directories (collection-aware mode) or under {dataRoot}/.dtm/ (synthetic mode).

func (*ShardNoopProvider) CleanupTask added in v1.37.0

func (p *ShardNoopProvider) CleanupTask(desc TaskDescriptor) error

func (*ShardNoopProvider) GetFinalizedGroups added in v1.37.0

func (p *ShardNoopProvider) GetFinalizedGroups(desc TaskDescriptor) map[string][]string

GetFinalizedGroups returns the per-group finalized unit IDs for a task.

func (*ShardNoopProvider) GetFinalizedUnits added in v1.37.0

func (p *ShardNoopProvider) GetFinalizedUnits(desc TaskDescriptor) []string

GetFinalizedUnits returns all finalized unit IDs across all groups for a task.

func (*ShardNoopProvider) GetLocalTasks added in v1.37.0

func (p *ShardNoopProvider) GetLocalTasks() []TaskDescriptor

func (*ShardNoopProvider) IsTaskCompleted added in v1.37.0

func (p *ShardNoopProvider) IsTaskCompleted(desc TaskDescriptor) bool

func (*ShardNoopProvider) OnGroupCompleted added in v1.37.0

func (p *ShardNoopProvider) OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string) error

func (*ShardNoopProvider) OnSwapRequested added in v1.38.0

func (p *ShardNoopProvider) OnSwapRequested(_ *Task, _ string, _ []string) error

OnSwapRequested is a no-op for ShardNoopProvider — this test provider is the canonical NeedsPreparationBarrier=false path (format-only shape), so the scheduler never fires this for ShardNoopProvider tasks. Implements the interface contract for build cleanliness.

func (*ShardNoopProvider) OnTaskCompleted added in v1.37.0

func (p *ShardNoopProvider) OnTaskCompleted(task *Task)

func (*ShardNoopProvider) SetCompletionRecorder added in v1.37.0

func (p *ShardNoopProvider) SetCompletionRecorder(recorder TaskCompletionRecorder)

func (*ShardNoopProvider) StartTask added in v1.37.0

func (p *ShardNoopProvider) StartTask(task *Task) (TaskHandle, error)

type ShardNoopProviderPayload added in v1.37.0

type ShardNoopProviderPayload struct {
	FailUnitID        string            `json:"failUnitId,omitempty"`
	Collection        string            `json:"collection,omitempty"`
	UnitToShard       map[string]string `json:"unitToShard,omitempty"`
	UnitToNode        map[string]string `json:"unitToNode,omitempty"`
	SlowUnitID        string            `json:"slowUnitId,omitempty"`
	SlowUnitDelayMs   int               `json:"slowUnitDelayMs,omitempty"`
	ProcessingDelayMs int               `json:"processingDelayMs,omitempty"`
	MaxConcurrency    int               `json:"maxConcurrency,omitempty"`
}

ShardNoopProviderPayload is the JSON payload for tasks created with the ShardNoopProvider. When Collection is set and a ShardLister is available, the provider uses real shard placement for unit ownership instead of the synthetic NodeID-based assignment.

UnitToShard maps unit IDs to shard names, allowing multiple units per shard (one per replica). UnitToNode maps unit IDs to the node that should process them, providing deterministic ownership when RF > 1 (where multiple nodes have the same shard locally). Both maps are required when Collection is set.

MaxConcurrency controls how many units are processed in parallel on each node. When > 1, processUnits fans out with a ConcurrencyLimiter instead of sequential iteration. Default 0 = sequential.

type Task

type Task struct {
	// Namespace is the namespace of distributed tasks which are managed by different Provider implementations
	Namespace string `json:"namespace"`

	TaskDescriptor `json:",inline"`

	// Payload is arbitrary data that is needed to execute a task of Namespace.
	Payload []byte `json:"payload"`

	// NeedsPreparationBarrier opts the task into the two-phase RAFT swap barrier:
	// AllUnitsTerminal transitions to PREPARING (not SWAPPING), and the
	// FSM gates PREPARING → SWAPPING on every node's PreparationCompleteAck.
	// Set at AddTask time; immutable thereafter.
	NeedsPreparationBarrier bool `json:"needsPreparationBarrier"`

	// Status is the current status of the task.
	Status TaskStatus `json:"status"`

	// StartedAt is the time that a task was submitted to the cluster.
	StartedAt time.Time `json:"startedAt"`

	// FinishedAt is the time that task reached a terminal status.
	// Additionally, it is used to schedule task clean up.
	FinishedAt time.Time `json:"finishedAt"`

	// Error is an optional field to store the error which moved the task to FAILED status.
	Error string `json:"error,omitempty"`

	// Units tracks per-unit progress. Always non-nil for valid tasks.
	Units map[string]*Unit `json:"units,omitempty"`

	// PreparationCompletionAcks records per-node PREP-phase confirmations during
	// PREPARING. The FSM gates PREPARING → SWAPPING on every expected ack
	// landing with Success=true; any Success=false flips to FAILED. Nil
	// for tasks that don't opt into the PREP barrier.
	PreparationCompletionAcks map[string]PostCompletionAck `json:"preparationCompletionAcks,omitempty"`

	// PostCompletionAcks records per-node confirmations that the node's
	// SWAP phase (the second half of the split OnGroupCompleted —
	// per-shard SwapBucketPointer tight loop + post-atomic tidy +
	// per-strategy OnMigrationComplete) completed successfully. Keys are
	// node IDs. Populated only after the task transitions to SWAPPING
	// and only by the [Scheduler] tick firing
	// [PostCompletionAckRecorder.RecordDistributedTaskPostCompletionAck]
	// once the local SWAP has returned. The Manager FSM gates the
	// SWAPPING → FINISHED transition (via
	// [TaskFinalizer.MarkDistributedTaskFinalized]) on having an ack
	// from every node with local units; if any ack is
	// [PostCompletionAck.Success]==false the FSM transitions the task
	// to FAILED instead.
	//
	// Nil for tasks whose provider does not implement [UnitAwareProvider]
	// (no OnGroupCompleted to ack).
	PostCompletionAcks map[string]PostCompletionAck `json:"postCompletionAcks,omitempty"`
}

Task represents a distributed task tracked across the cluster via Raft consensus.

Completion is tracked per-unit. The task finishes when all units reach a terminal state. A single unit failure immediately fails the entire task — remaining in-flight units are NOT waited for.

Units are always required when creating a task.

func (*Task) AllGroupUnitsTerminal added in v1.37.0

func (t *Task) AllGroupUnitsTerminal(groupID string) bool

AllGroupUnitsTerminal returns true if all units in the given group are terminal.

func (*Task) AllUnitsTerminal added in v1.37.0

func (t *Task) AllUnitsTerminal() bool

AllUnitsTerminal returns true if all units are in a terminal state (COMPLETED or FAILED).

func (*Task) AnyPostCompletionAckFailed added in v1.38.0

func (t *Task) AnyPostCompletionAckFailed() bool

AnyPostCompletionAckFailed returns true iff any node has recorded a post-completion ack with [PostCompletionAck.Success]==false. The FSM uses this on the apply path to flip the task to FAILED — once any node reports failure, the schema flip must be skipped and the task must not progress to FINISHED.

func (*Task) AnyUnitFailed added in v1.37.0

func (t *Task) AnyUnitFailed() bool

AnyUnitFailed returns true if any unit has FAILED status.

func (*Task) Clone

func (t *Task) Clone() *Task

func (*Task) Groups added in v1.37.0

func (t *Task) Groups() []string

Groups returns the distinct GroupIDs across all units (includes "" for ungrouped).

func (*Task) LocalGroupUnitIDs added in v1.37.0

func (t *Task) LocalGroupUnitIDs(groupID, nodeID string) []string

LocalGroupUnitIDs returns the IDs of units in the given group assigned to the given node.

func (*Task) LocalUnitIDs added in v1.37.0

func (t *Task) LocalUnitIDs(nodeID string) []string

LocalUnitIDs returns the IDs of units assigned to the given node.

func (*Task) MissingPostCompletionAckNodes added in v1.38.0

func (t *Task) MissingPostCompletionAckNodes() []string

MissingPostCompletionAckNodes returns the node IDs that have local units in this task but have NOT yet recorded a post-completion ack. The scheduler uses this as the gating predicate for [TaskFinalizer.MarkDistributedTaskFinalized] — the SWAPPING → FINISHED transition must wait until this returns empty, so a node whose RunSwapOnShard silently failed cannot let the cluster-wide schema flip commit before its ack is recorded as a failure (which transitions the task to FAILED and skips the flip).

Returns nil when all expected nodes have acked. Returned slice is unsorted.

func (*Task) NodeHasNonTerminalUnits added in v1.37.0

func (t *Task) NodeHasNonTerminalUnits(nodeID string) bool

NodeHasNonTerminalUnits returns true if the given node has units that are not yet terminal. Unassigned units (empty NodeID) are considered as belonging to any node.

func (*Task) NodesWithLocalUnits added in v1.38.0

func (t *Task) NodesWithLocalUnits() []string

NodesWithLocalUnits returns the set of node IDs that own at least one unit assigned to them in this task. Used by the Scheduler tick to compute the ack-barrier predicate: every such node must record a post-completion ack before the task can transition SWAPPING → FINISHED. Units with empty NodeID (still PENDING / never claimed) are skipped — they cannot have a node-side OnGroupCompleted result yet.

Returned slice is unsorted; callers that need determinism must sort it themselves.

type TaskCleaner

type TaskCleaner interface {
	CleanUpDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error
}

TaskCleaner is an interface for issuing a request to clean up a distributed task.

type TaskCompletionRecorder

type TaskCompletionRecorder interface {
	RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error
	RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error
	UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error
}

TaskCompletionRecorder is an interface for recording the completion of a distributed task.

type TaskDescriptor

type TaskDescriptor struct {
	// ID is the identifier of the task in the namespace.
	ID string `json:"ID"`

	// Version is the version of the task with task ID.
	// It is used to differentiate between multiple runs of the same task.
	Version uint64 `json:"version"`
}

TaskDescriptor is a struct identifying a task execution under a certain task namespace.

type TaskFinalizer added in v1.38.0

type TaskFinalizer interface {
	MarkDistributedTaskFinalized(ctx context.Context, namespace, taskID string, taskVersion uint64) error
}

TaskFinalizer is an interface for issuing a request to transition a task from TaskStatusSwapping to TaskStatusFinished. The Scheduler calls this from its tick after [Provider.OnTaskCompleted] returns successfully so the FSM-level FINISHED state lines up with "every post-completion callback committed cluster-wide" (not just "every unit terminal"). Idempotent at the FSM layer — every node's scheduler issues this independently after its local OnTaskCompleted returns; only the first commit actually flips the status. See the godoc on TaskStatusSwapping for the underlying race this discipline fixes.

type TaskHandle

type TaskHandle interface {
	// Terminate is a signal to stop executing the task. If the task is no longer running because it already finished,
	// the method call should be a no-op.
	//
	// Terminated task can be started later again, therefore, no local state can be removed.
	Terminate()

	// Done returns a channel that is closed when the task's goroutine exits, whether due to
	// completion, failure, or termination. The scheduler uses this to detect dead handles
	// and allow re-launch of tasks that still have pending work.
	Done() <-chan struct{}
}

TaskHandle is an interface to control a locally running task.

type TaskLister added in v1.38.0

type TaskLister interface {
	ListDistributedTasks(ctx context.Context) (map[string][]*Task, error)
}

TaskLister is an interface for listing distributed tasks in the cluster.

type TaskStatus

type TaskStatus string
const (
	// TaskStatusStarted means that the task is still running on some of
	// the nodes — at least one unit has not yet reached a terminal status
	// ([UnitStatusCompleted] or [UnitStatusFailed]).
	TaskStatusStarted TaskStatus = "STARTED"

	// TaskStatusPreparing means every unit terminal AND the task uses the
	// PREP barrier (semantic migrations). Each node is running PREP; the
	// task transitions to SWAPPING only after every node's PreparationCompleteAck
	// lands successfully — any failure flips the task to FAILED. Non-barrier
	// tasks skip this status entirely.
	TaskStatusPreparing TaskStatus = "PREPARING"

	// TaskStatusSwapping means every node is firing its per-shard atomic
	// swap. The cross-replica stagger window in this phase is bounded by
	// RAFT propagation latency (sub-second), not PREP duration. The task
	// transitions to FINISHED only after every node's PostCompletionAck
	// lands successfully.
	TaskStatusSwapping TaskStatus = "SWAPPING"

	// TaskStatusFinished means the task succeeded on every node AND every
	// per-node post-completion callback has run. Wait for this — not
	// SWAPPING — when polling for "fully done".
	TaskStatusFinished TaskStatus = "FINISHED"

	// TaskStatusCancelled means that the task was cancelled by user.
	TaskStatusCancelled TaskStatus = "CANCELLED"

	// TaskStatusFailed means that one of the nodes got a non-retryable
	// error and all other nodes terminated the execution.
	TaskStatusFailed TaskStatus = "FAILED"
)

func (TaskStatus) IsActive added in v1.38.0

func (t TaskStatus) IsActive() bool

IsActive is true for non-terminal in-flight states (STARTED, PREPARING, SWAPPING) — used by conflict detection and the schema MutationGuard.

func (TaskStatus) IsCoordinationPhase added in v1.38.0

func (t TaskStatus) IsCoordinationPhase() bool

IsCoordinationPhase is true for the post-units, pre-terminal phases (PREPARING, SWAPPING) — i.e. the scheduler-driven callback states.

func (TaskStatus) IsTerminal added in v1.38.0

func (t TaskStatus) IsTerminal() bool

IsTerminal reports whether this status is a terminal state. A task in a terminal state is not transitioning further: it has reached TaskStatusFinished, TaskStatusFailed, or TaskStatusCancelled and the scheduler will not invoke any more per-unit or per-task callbacks for it via the normal in-flight path. Recovery replays on startup can still observe terminal tasks — providers that own destructive side-effects (per-shard swaps, file moves) should short-circuit on terminal status to avoid running them again.

func (TaskStatus) String

func (t TaskStatus) String() string

type ThrottledRecorder added in v1.37.0

type ThrottledRecorder struct {
	// contains filtered or unexported fields
}

ThrottledRecorder wraps a TaskCompletionRecorder to prevent progress updates from flooding Raft consensus. Each unit's progress is forwarded at most once per the interval given to NewThrottledRecorder (DefaultThrottleInterval in production); intermediate updates are silently dropped. Completion and failure calls always pass through immediately — they are never throttled.

Throttle entries are cleaned up when a unit reaches a terminal state (completion or failure), so the internal map does not grow beyond the number of active units.

Two non-negotiable carve-outs (weaviate/0-weaviate-issues#240 Symptom B):

  • progress == 0.0 is never throttled — it is the per-unit worker's first call (the "claim") and the path that lands Unit.NodeID.
  • lastSent is updated only after a successful forward; a failed forward leaves no entry so the retry isn't blocked.

func NewThrottledRecorder added in v1.37.0

func NewThrottledRecorder(inner TaskCompletionRecorder, interval time.Duration, clock clockwork.Clock) *ThrottledRecorder

func (*ThrottledRecorder) RecordDistributedTaskUnitCompletion added in v1.37.0

func (r *ThrottledRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error

func (*ThrottledRecorder) RecordDistributedTaskUnitFailure added in v1.37.0

func (r *ThrottledRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error

func (*ThrottledRecorder) UpdateDistributedTaskUnitProgress added in v1.37.0

func (r *ThrottledRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error

type Unit added in v1.37.0

type Unit struct {
	ID         string     `json:"id"`
	GroupID    string     `json:"groupId,omitempty"`
	NodeID     string     `json:"nodeId"`
	Status     UnitStatus `json:"status"`
	Progress   float32    `json:"progress"`
	Error      string     `json:"error,omitempty"`
	UpdatedAt  time.Time  `json:"updatedAt"`
	FinishedAt time.Time  `json:"finishedAt,omitempty"`
}

Unit represents a trackable work unit within a distributed task (e.g. a single shard in a reindex operation). Units follow the lifecycle PENDING → IN_PROGRESS → COMPLETED/FAILED.

NodeID starts empty (unassigned) and is set on the first progress update. The Scheduler treats unassigned units as belonging to any node, which is how initial assignment happens: the first node to report progress claims the unit.

Unit values are owned by the Manager and mutated under its lock. Callers outside the Manager should only access units via cloned Task snapshots from ListDistributedTasks.

type UnitAwareProvider added in v1.37.0

type UnitAwareProvider interface {
	Provider
	// OnGroupCompleted fires when all units in a group reach terminal state.
	// localGroupUnitIDs contains ONLY units on this node; no-op for nodes
	// without local units in the group. Non-nil errors feed
	// RecordPreparationCompleteAck (barrier tasks) or RecordPostCompletionAck.
	OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string) error

	// OnSwapRequested fires after the cluster-wide PREP barrier lifts
	// (PREPARING → SWAPPING). Barrier tasks only. Non-nil errors feed
	// RecordPostCompletionAck (failure → FAILED, schema flip skipped).
	OnSwapRequested(task *Task, groupID string, localGroupUnitIDs []string) error

	// OnTaskCompleted is invoked by the [Scheduler] once per task that has
	// reached a terminal status, after every local unit terminated and
	// (for unit-aware providers) every per-node post-completion ack
	// landed. The scheduler MAY re-invoke this method for the same task
	// if a downstream finalize-record write fails — concretely, when
	// [TaskFinalizer.MarkDistributedTaskFinalized] returns an error the
	// rollback path clears the per-task fired-marker so the next tick
	// re-fires OnTaskCompleted before retrying the finalize. Implementations
	// MUST therefore be idempotent against repeat calls with the same
	// (TaskDescriptor, Status): re-running must not double-apply a
	// destructive side effect (a schema flip reverted, a marker emitted
	// twice, etc.). Today's concrete provider (db/reindex_provider.go's
	// OnTaskCompleted → autoCleanupAfterTerminal) already is; new
	// implementations MUST preserve this contract.
	OnTaskCompleted(task *Task)
}

UnitAwareProvider fires per-group callbacks as groups complete (mid-flight), then a global OnTaskCompleted when the task reaches terminal state.

Every unit task has groups. If no explicit GroupID is set, all units belong to a single implicit default group (""). This means:

  • Tasks without groups: OnGroupCompleted fires once with all local units when all units reach terminal state (same effect as having a single group).
  • Tasks with groups: OnGroupCompleted fires per-group as each completes, even while the task is still STARTED

Callback phases:

  1. OnGroupCompleted — fires per group as each group's units reach terminal. PREP-only for barrier tasks; PREP+OVERLAY+SWAP inline otherwise.
  2. OnSwapRequested — fires after the cluster-wide PREPARING → SWAPPING transition. Barrier tasks only; carries OVERLAY+SWAP.
  3. OnTaskCompleted — fires once on every node after ALL units terminal.

type UnitSpec added in v1.37.0

type UnitSpec struct {
	ID      string
	GroupID string
}

UnitSpec defines a unit with an optional group assignment. Used at task creation time when units need group membership (e.g. one group per tenant for MT reindex).

type UnitStatus added in v1.37.0

type UnitStatus string
const (
	UnitStatusPending    UnitStatus = "PENDING"
	UnitStatusInProgress UnitStatus = "IN_PROGRESS"
	UnitStatusCompleted  UnitStatus = "COMPLETED"
	UnitStatusFailed     UnitStatus = "FAILED"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL