Documentation
¶
Overview ¶
Package distributedtask coordinates long-running operations (e.g. reindexing) across a Weaviate cluster. All task state lives in the Raft log, so it survives node restarts and leader elections without external storage.
Architecture ¶
Three components work together:
Manager is the Raft state machine. It owns the canonical task state and is the only component that mutates it. All writes go through Raft Apply (see cluster/store_apply.go).
Scheduler runs on every node. It polls the Manager for the current task list, starts and stops local work via a Provider, and submits cleanup requests when tasks expire.
Provider is the extension point. Each task namespace (e.g. "reindex", "compaction") registers a Provider that knows how to execute that type of work locally.
Unit tracking ¶
Every task declares a set of named Unit items (e.g. one per shard). Each unit progresses independently through PENDING → IN_PROGRESS → COMPLETED/FAILED. The task finishes when all units reach a terminal state. Units are always required when creating a task.
Unit assignment ¶
Units start unassigned (empty NodeID). The Scheduler treats unassigned units as belonging to any node, so all nodes will start the task. The first node to report progress for a unit claims it — subsequent updates from other nodes are rejected. This means assignment is implicit and driven by the Provider implementation, not prescribed by the framework.
Failure semantics ¶
When any unit fails, the entire task immediately transitions to FAILED. In-flight units on other nodes are NOT waited for — their subsequent completion reports are rejected. This fail-fast approach avoids wasting cluster resources on doomed work.
Group-level finalization ¶
If the Provider implements UnitAwareProvider, the Scheduler fires per-group callbacks as groups complete — even while the task is still STARTED:
OnGroupCompleted — fires per group when all units in that group reach a terminal state. Receives the groupID and only the local unit IDs in that group. Fires mid-flight for tasks with explicit groups, enabling per-tenant atomicity for MT reindex. When no explicit GroupID is set, all units belong to default group "" and OnGroupCompleted fires once when all units are terminal (identical to old behavior).
OnTaskCompleted — fires once per node after ALL units reach terminal state. Use this for global operations (e.g. Raft schema update). Since Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.
Both callbacks fire on FINISHED and FAILED tasks so providers can finalize (success) or rollback (failure) based on task.Status. Both fire exactly once per task lifecycle.
Four journey examples ¶
Journey 1: Spread work across any node (no finalization needed).
A data-cleanup provider distributes 1000 files as units. Any node can claim any file. No finalization needed — each unit is independent.
unitIDs := []string{"file-001", "file-002", ..., "file-1000"}
raft.AddDistributedTask(ctx, "cleanup", taskID, payload, unitIDs)
The Provider's StartTask iterates units, processes unclaimed ones, and reports completion. No UnitAwareProvider needed.
Journey 2: Per-shard work, global finalize (behavior unchanged).
A repair/rebuild provider recreates an index with the same configuration (e.g., fixing a corrupted HNSW or rebuilding blockmax segments). Because the behavior is unchanged, shards can swap independently as they finish — there's no need to wait for all shards before swapping, since queries produce the same results regardless of which format a shard is currently serving from.
Unit IDs are opaque strings. The Provider defines them at task creation time and stores any shard→unit mapping in the task payload:
payload := ReindexPayload{
ShardMap: map[string]string{ // unitID → shardName
"u-0": "shard-S1", // nodeA's replica of S1
"u-1": "shard-S1", // nodeB's replica of S1
"u-2": "shard-S2", // ...
},
}
unitIDs := []string{"u-0", "u-1", "u-2", ...}
Node assignment is automatic: the first node to report progress for a unit claims it (Unit.NodeID is set). The Provider's StartTask iterates units, checks which local shards it owns, and claims the corresponding units.
Each shard swaps its bucket pointers immediately upon completing its own reindex (inside the StartTask goroutine, before calling RecordUnitCompletion).
OnGroupCompleted: no-op — each shard already swapped during its own processing. OnTaskCompleted: optional — e.g., log completion or flip a cosmetic schema flag.
Journey 3: Per-shard work, per-shard finalize after barrier (behavior changes).
A tokenization-change provider reindexes every shard with new tokenization config (e.g., WORD → TRIGRAM). Because the behavior changes, consistency matters: if some shards serve old tokenization while others serve new, queries return mixed results. ALL shards must finish reindexing before ANY shard swaps to the new format.
Unit IDs and shard mapping work the same way as Journey 2 — the Provider defines IDs at creation time and stores the mapping in the task payload. The framework only cares about Unit.NodeID for ownership tracking.
During StartTask, each shard reindexes into new segments but does NOT swap yet. It reports progress and completion, but the old segments remain active for queries.
OnGroupCompleted: fires on each node AFTER all units across all nodes finish (since all units share the default group ""). Receives localGroupUnitIDs — the Provider looks up the shard mapping from task.Payload to know which local shards to swap. Atomically swaps bucket pointers for each local shard. This is a local operation, no Raft needed.
OnTaskCompleted: submits a Raft schema update to change the tokenization config. Because Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.
The barrier guarantee ensures NO shard swaps until ALL shards finish reindexing.
Journey 4: Per-tenant work with per-tenant finalize (MT reindex with groups).
A multi-tenant reindex provider creates one group per tenant. Each tenant's replicas are units in that group. As each tenant's group completes, OnGroupCompleted fires mid-flight — the provider atomically swaps that tenant's bucket pointers without waiting for other tenants. This provides per-tenant atomicity: if tenant A's group completes while tenant B is still reindexing, tenant A starts serving new data immediately.
specs := []UnitSpec{
{ID: "t1__nodeA", GroupID: "tenant-1"},
{ID: "t1__nodeB", GroupID: "tenant-1"},
{ID: "t2__nodeA", GroupID: "tenant-2"},
...
}
raft.AddDistributedTaskWithGroups(ctx, "reindex", taskID, payload, specs)
OnGroupCompleted: fires per-tenant as each tenant's replicas all finish. Atomically swaps bucket pointers for the local replicas of that tenant.
OnTaskCompleted: fires once when ALL tenants finish. Updates schema with new tokenization config. If any tenant failed, task.Status == FAILED — provider skips schema update but already-swapped tenants remain valid (independent).
Progress throttling ¶
Unit progress updates go through Raft consensus. To prevent flooding the log, the Scheduler wraps the TaskCompletionRecorder in a ThrottledRecorder that forwards progress for each unit at most once per 30 seconds. Completion and failure calls are never throttled.
Adding a new task type ¶
To add a new kind of distributed task:
- Define a namespace constant (e.g. "my-reindex").
- Implement Provider (or UnitAwareProvider if you need group-level callbacks).
- Register the provider in configure_api.go's MakeAppState, keyed by your namespace.
- Create tasks via the Raft endpoint [cluster.Raft.AddDistributedTask], passing unit IDs (at least one unit is always required).
See ShardNoopProvider for a complete working example used by acceptance tests.
Provider idempotency contract ¶
After a node crash, the Scheduler re-launches tasks that still have non-terminal units. The Provider MUST handle re-invocation idempotently:
Units in IN_PROGRESS state will be re-delivered to the same node that claimed them. The provider must detect partially-completed work (e.g. via sentinel files) and either resume or restart the unit safely.
Units in PENDING state (unclaimed) may be delivered to any node. The provider must tolerate being asked to process a unit that another node is also attempting to claim — only one node's first progress update will succeed. Providers that use per-replica assignment (UnitToNode metadata) avoid this race entirely, since each unit is deterministically assigned to exactly one node at creation time.
The framework does NOT re-assign units claimed by a crashed node to other nodes. The crashed node must eventually restart for its IN_PROGRESS units to complete. If a node is permanently lost, the task must be cancelled manually.
Typical idempotency patterns:
- Check for a completion sentinel file before starting work
- Use atomic file operations (write-to-temp + rename) for crash safety
- Store progress checkpoints that allow resuming from the last known good state
Index ¶
- Constants
- type ConcurrencyLimiter
- type ListDistributedTasksResponse
- type Manager
- func (m *Manager) AddTask(c *api.ApplyRequest, seqNum uint64) error
- func (m *Manager) CancelTask(a *api.ApplyRequest) error
- func (m *Manager) CleanUpTask(a *api.ApplyRequest) error
- func (m *Manager) ListDistributedTasks(_ context.Context) (map[string][]*Task, error)
- func (m *Manager) ListDistributedTasksPayload(ctx context.Context) ([]byte, error)
- func (m *Manager) RecordUnitCompletion(c *api.ApplyRequest) error
- func (m *Manager) Restore(bytes []byte) error
- func (m *Manager) Snapshot() ([]byte, error)
- func (m *Manager) UpdateUnitProgress(c *api.ApplyRequest) error
- type ManagerParameters
- type MockTaskCleaner
- type MockTaskCleaner_CleanUpDistributedTask_Call
- func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) Return(_a0 error) *MockTaskCleaner_CleanUpDistributedTask_Call
- func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) Run(...) *MockTaskCleaner_CleanUpDistributedTask_Call
- func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn(run func(context.Context, string, string, uint64) error) *MockTaskCleaner_CleanUpDistributedTask_Call
- type MockTaskCleaner_Expecter
- type MockTaskCompletionRecorder
- func (_m *MockTaskCompletionRecorder) EXPECT() *MockTaskCompletionRecorder_Expecter
- func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace string, taskID string, version uint64, ...) error
- func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace string, taskID string, version uint64, ...) error
- func (_m *MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace string, taskID string, version uint64, ...) error
- type MockTaskCompletionRecorder_Expecter
- func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion(ctx interface{}, namespace interface{}, taskID interface{}, ...) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
- func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure(ctx interface{}, namespace interface{}, taskID interface{}, ...) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
- func (_e *MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress(ctx interface{}, namespace interface{}, taskID interface{}, ...) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
- type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Return(_a0 error) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Run(...) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) RunAndReturn(run func(context.Context, string, string, uint64, string, string) error) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
- type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Return(_a0 error) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Run(...) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
- func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) RunAndReturn(...) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
- type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
- func (_c *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Return(_a0 error) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
- func (_c *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Run(...) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
- func (_c *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) RunAndReturn(...) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
- type Provider
- type Scheduler
- type SchedulerParams
- type ShardLister
- type ShardNoopProvider
- func (p *ShardNoopProvider) CleanupTask(desc TaskDescriptor) error
- func (p *ShardNoopProvider) GetFinalizedGroups(desc TaskDescriptor) map[string][]string
- func (p *ShardNoopProvider) GetFinalizedUnits(desc TaskDescriptor) []string
- func (p *ShardNoopProvider) GetLocalTasks() []TaskDescriptor
- func (p *ShardNoopProvider) IsTaskCompleted(desc TaskDescriptor) bool
- func (p *ShardNoopProvider) OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string)
- func (p *ShardNoopProvider) OnTaskCompleted(task *Task)
- func (p *ShardNoopProvider) SetCompletionRecorder(recorder TaskCompletionRecorder)
- func (p *ShardNoopProvider) StartTask(task *Task) (TaskHandle, error)
- type ShardNoopProviderPayload
- type Task
- func (t *Task) AllGroupUnitsTerminal(groupID string) bool
- func (t *Task) AllUnitsTerminal() bool
- func (t *Task) AnyUnitFailed() bool
- func (t *Task) Clone() *Task
- func (t *Task) Groups() []string
- func (t *Task) LocalGroupUnitIDs(groupID, nodeID string) []string
- func (t *Task) LocalUnitIDs(nodeID string) []string
- func (t *Task) NodeHasNonTerminalUnits(nodeID string) bool
- type TaskCleaner
- type TaskCompletionRecorder
- type TaskDescriptor
- type TaskHandle
- type TaskStatus
- type TasksLister
- type ThrottledRecorder
- func (r *ThrottledRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, ...) error
- func (r *ThrottledRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, ...) error
- func (r *ThrottledRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, ...) error
- type Unit
- type UnitAwareProvider
- type UnitSpec
- type UnitStatus
Constants ¶
const ShardNoopProviderNamespace = "shard-noop"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrencyLimiter ¶ added in v1.37.0
type ConcurrencyLimiter struct {
// contains filtered or unexported fields
}
ConcurrencyLimiter is a channel-based semaphore that limits the number of concurrent operations. Acquire blocks until a slot is available or the context is cancelled. Release returns a slot to the pool.
func NewConcurrencyLimiter ¶ added in v1.37.0
func NewConcurrencyLimiter(maxConcurrency int) *ConcurrencyLimiter
NewConcurrencyLimiter creates a limiter that allows up to maxConcurrency concurrent operations. Values < 1 are clamped to 1.
func (*ConcurrencyLimiter) Acquire ¶ added in v1.37.0
func (l *ConcurrencyLimiter) Acquire(ctx context.Context) error
Acquire blocks until a slot is available or ctx is cancelled.
func (*ConcurrencyLimiter) Release ¶ added in v1.37.0
func (l *ConcurrencyLimiter) Release()
Release returns a slot to the pool. Must be called exactly once for each successful Acquire.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is responsible for managing distributed tasks across the cluster.
func NewManager ¶
func NewManager(params ManagerParameters) *Manager
func (*Manager) AddTask ¶
func (m *Manager) AddTask(c *api.ApplyRequest, seqNum uint64) error
AddTask registers a new distributed task from a Raft apply. The seqNum becomes the task's Version, used to distinguish re-runs of the same task ID. Returns an error if a task with the same namespace/ID is already running, or if no units are provided.
func (*Manager) CancelTask ¶
func (m *Manager) CancelTask(a *api.ApplyRequest) error
CancelTask transitions a running task to CANCELLED. In-flight units are not waited for — the Scheduler will terminate their local handles on the next tick.
func (*Manager) CleanUpTask ¶
func (m *Manager) CleanUpTask(a *api.ApplyRequest) error
CleanUpTask removes a terminal task from the Manager's state. It refuses to clean up tasks that are still running or whose completedTaskTTL has not yet elapsed, preventing premature removal of status information that other nodes may still need to observe.
func (*Manager) ListDistributedTasks ¶
ListDistributedTasks returns a snapshot of all tasks grouped by namespace. Each Task is cloned, so callers may read the returned values without holding the Manager's lock.
func (*Manager) ListDistributedTasksPayload ¶
func (*Manager) RecordUnitCompletion ¶ added in v1.37.0
func (m *Manager) RecordUnitCompletion(c *api.ApplyRequest) error
RecordUnitCompletion handles both success and failure (distinguished by a non-empty error field in the request). On failure, the task transitions to FAILED immediately — remaining in-flight units are NOT waited for, and their subsequent completion reports will be rejected with "task is no longer running". This fail-fast behavior is intentional: it avoids wasting cluster resources on a task that is already doomed.
func (*Manager) Restore ¶
Restore replaces the Manager's in-memory state from a Raft snapshot produced by Manager.Snapshot. It is called during Raft leader election or when a follower installs a snapshot from the leader.
func (*Manager) Snapshot ¶
Snapshot serialises the full task state to JSON for Raft snapshotting. The inverse operation is Manager.Restore.
func (*Manager) UpdateUnitProgress ¶ added in v1.37.0
func (m *Manager) UpdateUnitProgress(c *api.ApplyRequest) error
UpdateUnitProgress also handles initial node assignment: the first progress update for an unassigned unit sets its NodeID, claiming it for that node. After assignment, updates from other nodes are rejected. Progress updates to terminal units are silently ignored (no error) because in-flight Raft commands may arrive after a unit has already completed.
type ManagerParameters ¶
type MockTaskCleaner ¶
MockTaskCleaner is an autogenerated mock type for the TaskCleaner type
func NewMockTaskCleaner ¶
func NewMockTaskCleaner(t interface {
mock.TestingT
Cleanup(func())
}) *MockTaskCleaner
NewMockTaskCleaner creates a new instance of MockTaskCleaner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockTaskCleaner) CleanUpDistributedTask ¶
func (_m *MockTaskCleaner) CleanUpDistributedTask(ctx context.Context, namespace string, taskID string, taskVersion uint64) error
CleanUpDistributedTask provides a mock function with given fields: ctx, namespace, taskID, taskVersion
func (*MockTaskCleaner) EXPECT ¶
func (_m *MockTaskCleaner) EXPECT() *MockTaskCleaner_Expecter
type MockTaskCleaner_CleanUpDistributedTask_Call ¶
MockTaskCleaner_CleanUpDistributedTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanUpDistributedTask'
func (*MockTaskCleaner_CleanUpDistributedTask_Call) Return ¶
func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) Return(_a0 error) *MockTaskCleaner_CleanUpDistributedTask_Call
func (*MockTaskCleaner_CleanUpDistributedTask_Call) Run ¶
func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) Run(run func(ctx context.Context, namespace string, taskID string, taskVersion uint64)) *MockTaskCleaner_CleanUpDistributedTask_Call
func (*MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn ¶
func (_c *MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn(run func(context.Context, string, string, uint64) error) *MockTaskCleaner_CleanUpDistributedTask_Call
type MockTaskCleaner_Expecter ¶
type MockTaskCleaner_Expecter struct {
// contains filtered or unexported fields
}
func (*MockTaskCleaner_Expecter) CleanUpDistributedTask ¶
func (_e *MockTaskCleaner_Expecter) CleanUpDistributedTask(ctx interface{}, namespace interface{}, taskID interface{}, taskVersion interface{}) *MockTaskCleaner_CleanUpDistributedTask_Call
CleanUpDistributedTask is a helper method to define mock.On call
- ctx context.Context
- namespace string
- taskID string
- taskVersion uint64
type MockTaskCompletionRecorder ¶
MockTaskCompletionRecorder is an autogenerated mock type for the TaskCompletionRecorder type
func NewMockTaskCompletionRecorder ¶
func NewMockTaskCompletionRecorder(t interface {
mock.TestingT
Cleanup(func())
}) *MockTaskCompletionRecorder
NewMockTaskCompletionRecorder creates a new instance of MockTaskCompletionRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockTaskCompletionRecorder) EXPECT ¶
func (_m *MockTaskCompletionRecorder) EXPECT() *MockTaskCompletionRecorder_Expecter
func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion ¶ added in v1.37.0
func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string) error
RecordDistributedTaskUnitCompletion provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID
func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure ¶ added in v1.37.0
func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, errMsg string) error
RecordDistributedTaskUnitFailure provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, errMsg
func (*MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress ¶ added in v1.37.0
func (_m *MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, progress float32) error
UpdateDistributedTaskUnitProgress provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, progress
type MockTaskCompletionRecorder_Expecter ¶
type MockTaskCompletionRecorder_Expecter struct {
// contains filtered or unexported fields
}
func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion ¶ added in v1.37.0
func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
RecordDistributedTaskUnitCompletion is a helper method to define mock.On call
- ctx context.Context
- namespace string
- taskID string
- version uint64
- nodeID string
- unitID string
func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure ¶ added in v1.37.0
func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, errMsg interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
RecordDistributedTaskUnitFailure is a helper method to define mock.On call
- ctx context.Context
- namespace string
- taskID string
- version uint64
- nodeID string
- unitID string
- errMsg string
func (*MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress ¶ added in v1.37.0
func (_e *MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, progress interface{}) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
UpdateDistributedTaskUnitProgress is a helper method to define mock.On call
- ctx context.Context
- namespace string
- taskID string
- version uint64
- nodeID string
- unitID string
- progress float32
type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call ¶ added in v1.37.0
MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitCompletion'
func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Return ¶ added in v1.37.0
func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Run ¶ added in v1.37.0
func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Run(run func(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string)) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) RunAndReturn ¶ added in v1.37.0
func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) RunAndReturn(run func(context.Context, string, string, uint64, string, string) error) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call
type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call ¶ added in v1.37.0
MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitFailure'
func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Run ¶ added in v1.37.0
func (_c *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Run(run func(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, errMsg string)) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call
func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) RunAndReturn ¶ added in v1.37.0
type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call ¶ added in v1.37.0
MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateDistributedTaskUnitProgress'
func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Run ¶ added in v1.37.0
func (_c *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Run(run func(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, progress float32)) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call
func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) RunAndReturn ¶ added in v1.37.0
type Provider ¶
type Provider interface {
// SetCompletionRecorder is invoked on node startup to register TaskCompletionRecorder which
// should be passed to all launch tasks so they could mark their completion.
SetCompletionRecorder(recorder TaskCompletionRecorder)
// GetLocalTasks returns a list of tasks that provider is aware of from the local node state.
GetLocalTasks() []TaskDescriptor
// CleanupTask is a signal to clean up the task local state.
CleanupTask(desc TaskDescriptor) error
// StartTask is a signal to start executing the task in the background.
StartTask(task *Task) (TaskHandle, error)
}
Provider is an interface for the management and execution of a group of tasks denoted by a namespace.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the component which is responsible for polling the active tasks in the cluster (via the Manager) and making sure that the tasks are running on the local node.
The general flow of a distributed task is as follows: 1. A Provider is registered with the Scheduler at startup to handle all tasks under a specific namespace. 2. A task is created and added to the cluster via the Manager.AddTask. 3. Scheduler regularly scans all available tasks in the cluster, picks up new ones and instructs the Provider to execute them locally. 4. A task is responsible for updating its status in the cluster via TaskCompletionRecorder. 5. Scheduler polls the cluster for the task status and checks if it is still running. It cancels the local task if it is not marked as STARTED anymore. 6. After completed task TTL has passed, the Scheduler issues the Manager.CleanUpDistributedTask request to remove the task from the cluster list. 7. After a task is removed from the cluster list, the Scheduler instructs the Provider to clean up the local task state.
func NewScheduler ¶
func NewScheduler(params SchedulerParams) *Scheduler
func (*Scheduler) Close ¶
func (s *Scheduler) Close()
Close stops the background tick loop and terminates all running task handles. It blocks until all handles have been signalled. After Close returns, no new ticks will fire.
func (*Scheduler) Start ¶
Start wires up providers with a ThrottledRecorder, performs an initial task listing to bootstrap any already-active tasks, and spawns the background tick loop. It is safe to call exactly once. Use Scheduler.Close to stop the loop and terminate all running tasks.
type SchedulerParams ¶
type SchedulerParams struct {
CompletionRecorder TaskCompletionRecorder
TasksLister TasksLister
TaskCleaner TaskCleaner
Providers map[string]Provider
Clock clockwork.Clock
Logger logrus.FieldLogger
MetricsRegisterer prometheus.Registerer
LocalNode string
CompletedTaskTTL time.Duration
TickInterval time.Duration
}
type ShardLister ¶ added in v1.37.0
ShardLister provides local shard names for a collection, allowing the ShardNoopProvider to determine unit ownership based on real shard topology without importing the db package.
type ShardNoopProvider ¶ added in v1.37.0
type ShardNoopProvider struct {
// contains filtered or unexported fields
}
ShardNoopProvider is a test-only UnitAwareProvider used by acceptance tests to exercise the unit lifecycle end-to-end. It is registered in configure_api.go behind the SHARD_NOOP_PROVIDER_ENABLED env var and exposed via a debug HTTP endpoint on port 6060.
On StartTask, it spawns a goroutine that iterates units sequentially, reports 50% progress, then completes each one. Set FailUnitID in the payload to make one unit fail instead, which triggers the task-level fail-fast behavior.
When a Collection is specified in the payload and a ShardLister is provided, the provider only claims units whose IDs match local shard names. This validates that unit ownership aligns with actual shard placement.
Marker files are written as hidden dot-files inside shard directories (when a collection is specified) or under {dataRoot}/.dtm/ (for synthetic units). This avoids writing to /tmp and keeps side effects scoped to the Weaviate data directory.
func NewShardNoopProvider ¶ added in v1.37.0
func NewShardNoopProvider(nodeID string, logger logrus.FieldLogger, shardLister ShardLister, dataRoot string) *ShardNoopProvider
NewShardNoopProvider creates a new ShardNoopProvider. Pass nil for shardLister when real shard topology is not needed (e.g. unit tests with synthetic unit IDs). dataRoot is the Weaviate persistence data path; marker files are written as hidden dot-files inside shard directories (collection-aware mode) or under {dataRoot}/.dtm/ (synthetic mode).
func (*ShardNoopProvider) CleanupTask ¶ added in v1.37.0
func (p *ShardNoopProvider) CleanupTask(desc TaskDescriptor) error
func (*ShardNoopProvider) GetFinalizedGroups ¶ added in v1.37.0
func (p *ShardNoopProvider) GetFinalizedGroups(desc TaskDescriptor) map[string][]string
GetFinalizedGroups returns the per-group finalized unit IDs for a task.
func (*ShardNoopProvider) GetFinalizedUnits ¶ added in v1.37.0
func (p *ShardNoopProvider) GetFinalizedUnits(desc TaskDescriptor) []string
GetFinalizedUnits returns all finalized unit IDs across all groups for a task.
func (*ShardNoopProvider) GetLocalTasks ¶ added in v1.37.0
func (p *ShardNoopProvider) GetLocalTasks() []TaskDescriptor
func (*ShardNoopProvider) IsTaskCompleted ¶ added in v1.37.0
func (p *ShardNoopProvider) IsTaskCompleted(desc TaskDescriptor) bool
func (*ShardNoopProvider) OnGroupCompleted ¶ added in v1.37.0
func (p *ShardNoopProvider) OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string)
func (*ShardNoopProvider) OnTaskCompleted ¶ added in v1.37.0
func (p *ShardNoopProvider) OnTaskCompleted(task *Task)
func (*ShardNoopProvider) SetCompletionRecorder ¶ added in v1.37.0
func (p *ShardNoopProvider) SetCompletionRecorder(recorder TaskCompletionRecorder)
func (*ShardNoopProvider) StartTask ¶ added in v1.37.0
func (p *ShardNoopProvider) StartTask(task *Task) (TaskHandle, error)
type ShardNoopProviderPayload ¶ added in v1.37.0
type ShardNoopProviderPayload struct {
FailUnitID string `json:"failUnitId,omitempty"`
Collection string `json:"collection,omitempty"`
UnitToShard map[string]string `json:"unitToShard,omitempty"`
UnitToNode map[string]string `json:"unitToNode,omitempty"`
SlowUnitID string `json:"slowUnitId,omitempty"`
SlowUnitDelayMs int `json:"slowUnitDelayMs,omitempty"`
ProcessingDelayMs int `json:"processingDelayMs,omitempty"`
MaxConcurrency int `json:"maxConcurrency,omitempty"`
}
ShardNoopProviderPayload is the JSON payload for tasks created with the ShardNoopProvider. When Collection is set and a ShardLister is available, the provider uses real shard placement for unit ownership instead of the synthetic NodeID-based assignment.
UnitToShard maps unit IDs to shard names, allowing multiple units per shard (one per replica). UnitToNode maps unit IDs to the node that should process them, providing deterministic ownership when RF > 1 (where multiple nodes have the same shard locally). Both maps are required when Collection is set.
MaxConcurrency controls how many units are processed in parallel on each node. When > 1, processUnits fans out with a ConcurrencyLimiter instead of sequential iteration. Default 0 = sequential (existing behavior).
type Task ¶
type Task struct {
// Namespace is the namespace of distributed tasks which are managed by different Provider implementations
Namespace string `json:"namespace"`
TaskDescriptor `json:",inline"`
// Payload is arbitrary data that is needed to execute a task of Namespace.
Payload []byte `json:"payload"`
// Status is the current status of the task.
Status TaskStatus `json:"status"`
// StartedAt is the time that a task was submitted to the cluster.
StartedAt time.Time `json:"startedAt"`
// FinishedAt is the time that task reached a terminal status.
// Additionally, it is used to schedule task clean up.
FinishedAt time.Time `json:"finishedAt"`
// Error is an optional field to store the error which moved the task to FAILED status.
Error string `json:"error,omitempty"`
// Units tracks per-unit progress. Always non-nil for valid tasks.
Units map[string]*Unit `json:"units,omitempty"`
}
Task represents a distributed task tracked across the cluster via Raft consensus.
Completion is tracked per-unit. The task finishes when all units reach a terminal state. A single unit failure immediately fails the entire task — remaining in-flight units are NOT waited for.
Units are always required when creating a task.
func (*Task) AllGroupUnitsTerminal ¶ added in v1.37.0
AllGroupUnitsTerminal returns true if all units in the given group are terminal.
func (*Task) AllUnitsTerminal ¶ added in v1.37.0
AllUnitsTerminal returns true if all units are in a terminal state (COMPLETED or FAILED).
func (*Task) AnyUnitFailed ¶ added in v1.37.0
AnyUnitFailed returns true if any unit has FAILED status.
func (*Task) Groups ¶ added in v1.37.0
Groups returns the distinct GroupIDs across all units (includes "" for ungrouped).
func (*Task) LocalGroupUnitIDs ¶ added in v1.37.0
LocalGroupUnitIDs returns the IDs of units in the given group assigned to the given node.
func (*Task) LocalUnitIDs ¶ added in v1.37.0
LocalUnitIDs returns the IDs of units assigned to the given node.
func (*Task) NodeHasNonTerminalUnits ¶ added in v1.37.0
NodeHasNonTerminalUnits returns true if the given node has units that are not yet terminal. Unassigned units (empty NodeID) are considered as belonging to any node.
type TaskCleaner ¶
type TaskCleaner interface {
CleanUpDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error
}
TaskCleaner is an interface for issuing a request to clean up a distributed task.
type TaskCompletionRecorder ¶
type TaskCompletionRecorder interface {
RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error
RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error
UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error
}
TaskCompletionRecorder is an interface for recording the completion of a distributed task.
type TaskDescriptor ¶
type TaskDescriptor struct {
// ID is the identifier of the task in the namespace.
ID string `json:"ID"`
// Version is the version of the task with task ID.
// It is used to differentiate between multiple runs of the same task.
Version uint64 `json:"version"`
}
TaskDescriptor is a struct identifying a task execution under a certain task namespace.
type TaskHandle ¶
type TaskHandle interface {
// Terminate is a signal to stop executing the task. If the task is no longer running because it already finished,
// the method call should be a no-op.
//
// Terminated task can be started later again, therefore, no local state can be removed.
Terminate()
// Done returns a channel that is closed when the task's goroutine exits, whether due to
// completion, failure, or termination. The scheduler uses this to detect dead handles
// and allow re-launch of tasks that still have pending work.
Done() <-chan struct{}
}
TaskHandle is an interface to control a locally running task.
type TaskStatus ¶
type TaskStatus string
const ( // TaskStatusStarted means that the task is still running on some of the nodes. TaskStatusStarted TaskStatus = "STARTED" // TaskStatusFinished means that the task was successfully executed by all nodes. TaskStatusFinished TaskStatus = "FINISHED" // TaskStatusCancelled means that the task was cancelled by user. TaskStatusCancelled TaskStatus = "CANCELLED" // TaskStatusFailed means that one of the nodes got a non-retryable error and all other nodes // terminated the execution. TaskStatusFailed TaskStatus = "FAILED" )
func (TaskStatus) String ¶
func (t TaskStatus) String() string
type TasksLister ¶
type TasksLister interface {
ListDistributedTasks(ctx context.Context) (map[string][]*Task, error)
}
TasksLister is an interface for listing distributed tasks in the cluster.
type ThrottledRecorder ¶ added in v1.37.0
type ThrottledRecorder struct {
// contains filtered or unexported fields
}
ThrottledRecorder wraps a TaskCompletionRecorder to prevent progress updates from flooding Raft consensus. Each unit's progress is forwarded at most once per interval (default 30s); intermediate updates are silently dropped. Completion and failure calls always pass through immediately — they are never throttled.
Throttle entries are cleaned up when a unit reaches a terminal state (completion or failure), so the internal map does not grow beyond the number of active units.
func NewThrottledRecorder ¶ added in v1.37.0
func NewThrottledRecorder(inner TaskCompletionRecorder, interval time.Duration, clock clockwork.Clock) *ThrottledRecorder
func (*ThrottledRecorder) RecordDistributedTaskUnitCompletion ¶ added in v1.37.0
func (*ThrottledRecorder) RecordDistributedTaskUnitFailure ¶ added in v1.37.0
func (*ThrottledRecorder) UpdateDistributedTaskUnitProgress ¶ added in v1.37.0
type Unit ¶ added in v1.37.0
type Unit struct {
ID string `json:"id"`
GroupID string `json:"groupId,omitempty"`
NodeID string `json:"nodeId"`
Status UnitStatus `json:"status"`
Progress float32 `json:"progress"`
Error string `json:"error,omitempty"`
UpdatedAt time.Time `json:"updatedAt"`
FinishedAt time.Time `json:"finishedAt,omitempty"`
}
Unit represents a trackable work unit within a distributed task (e.g. a single shard in a reindex operation). Units follow the lifecycle PENDING → IN_PROGRESS → COMPLETED/FAILED.
NodeID starts empty (unassigned) and is set on the first progress update. The Scheduler treats unassigned units as belonging to any node, which is how initial assignment happens: the first node to report progress claims the unit.
Unit values are owned by the Manager and mutated under its lock. Callers outside the Manager should only access units via cloned Task snapshots from ListDistributedTasks.
type UnitAwareProvider ¶ added in v1.37.0
type UnitAwareProvider interface {
Provider
OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string)
OnTaskCompleted(task *Task)
}
UnitAwareProvider fires per-group callbacks as groups complete (mid-flight), then a global OnTaskCompleted when the task reaches terminal state.
Every unit task has groups. If no explicit GroupID is set, all units belong to a single implicit default group (""). This means:
- Tasks without groups: OnGroupCompleted fires once with all local units when all units reach terminal state (same effect as having a single group).
- Tasks with groups: OnGroupCompleted fires per-group as each completes, even while the task is still STARTED
Callback phases:
- OnGroupCompleted — per group, fires as each group's units all reach terminal
- OnTaskCompleted — fires once on every node after ALL units terminal
type UnitSpec ¶ added in v1.37.0
UnitSpec defines a unit with an optional group assignment. Used at task creation time when units need group membership (e.g. one group per tenant for MT reindex).
type UnitStatus ¶ added in v1.37.0
type UnitStatus string
const ( UnitStatusPending UnitStatus = "PENDING" UnitStatusInProgress UnitStatus = "IN_PROGRESS" UnitStatusCompleted UnitStatus = "COMPLETED" UnitStatusFailed UnitStatus = "FAILED" )