distributedtask

package
v1.37.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: BSD-3-Clause Imports: 17 Imported by: 0

Documentation

Overview

Package distributedtask coordinates long-running operations (e.g. reindexing) across a Weaviate cluster. All task state lives in the Raft log, so it survives node restarts and leader elections without external storage.

Architecture

Three components work together:

  • Manager is the Raft state machine. It owns the canonical task state and is the only component that mutates it. All writes go through Raft Apply (see cluster/store_apply.go).

  • Scheduler runs on every node. It polls the Manager for the current task list, starts and stops local work via a Provider, and submits cleanup requests when tasks expire.

  • Provider is the extension point. Each task namespace (e.g. "reindex", "compaction") registers a Provider that knows how to execute that type of work locally.

Unit tracking

Every task declares a set of named Unit items (e.g. one per shard). Each unit progresses independently through PENDING → IN_PROGRESS → COMPLETED/FAILED. The task finishes when all units reach a terminal state. Units are always required when creating a task.

Unit assignment

Units start unassigned (empty NodeID). The Scheduler treats unassigned units as belonging to any node, so all nodes will start the task. The first node to report progress for a unit claims it — subsequent updates from other nodes are rejected. This means assignment is implicit and driven by the Provider implementation, not prescribed by the framework.

Failure semantics

When any unit fails, the entire task immediately transitions to FAILED. In-flight units on other nodes are NOT waited for — their subsequent completion reports are rejected. This fail-fast approach avoids wasting cluster resources on doomed work.

Group-level finalization

If the Provider implements UnitAwareProvider, the Scheduler fires per-group callbacks as groups complete — even while the task is still STARTED:

  1. OnGroupCompleted — fires per group when all units in that group reach a terminal state. Receives the groupID and only the local unit IDs in that group. Fires mid-flight for tasks with explicit groups, enabling per-tenant atomicity for MT reindex. When no explicit GroupID is set, all units belong to default group "" and OnGroupCompleted fires once when all units are terminal (identical to old behavior).

  2. OnTaskCompleted — fires once per node after ALL units reach terminal state. Use this for global operations (e.g. Raft schema update). Since Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.

Both callbacks fire on FINISHED and FAILED tasks so providers can finalize (success) or rollback (failure) based on task.Status. Both fire exactly once per task lifecycle.

Four journey examples

Journey 1: Spread work across any node (no finalization needed).

A data-cleanup provider distributes 1000 files as units. Any node can claim any file. No finalization needed — each unit is independent.

unitIDs := []string{"file-001", "file-002", ..., "file-1000"}
raft.AddDistributedTask(ctx, "cleanup", taskID, payload, unitIDs)

The Provider's StartTask iterates units, processes unclaimed ones, and reports completion. No UnitAwareProvider needed.

Journey 2: Per-shard work, global finalize (behavior unchanged).

A repair/rebuild provider recreates an index with the same configuration (e.g., fixing a corrupted HNSW or rebuilding blockmax segments). Because the behavior is unchanged, shards can swap independently as they finish — there's no need to wait for all shards before swapping, since queries produce the same results regardless of which format a shard is currently serving from.

Unit IDs are opaque strings. The Provider defines them at task creation time and stores any shard→unit mapping in the task payload:

payload := ReindexPayload{
    ShardMap: map[string]string{  // unitID → shardName
        "u-0": "shard-S1",       // nodeA's replica of S1
        "u-1": "shard-S1",       // nodeB's replica of S1
        "u-2": "shard-S2",       // ...
    },
}
unitIDs := []string{"u-0", "u-1", "u-2", ...}

Node assignment is automatic: the first node to report progress for a unit claims it (Unit.NodeID is set). The Provider's StartTask iterates units, checks which local shards it owns, and claims the corresponding units.

Each shard swaps its bucket pointers immediately upon completing its own reindex (inside the StartTask goroutine, before calling RecordUnitCompletion).

OnGroupCompleted: no-op — each shard already swapped during its own processing. OnTaskCompleted: optional — e.g., log completion or flip a cosmetic schema flag.

Journey 3: Per-shard work, per-shard finalize after barrier (behavior changes).

A tokenization-change provider reindexes every shard with new tokenization config (e.g., WORD → TRIGRAM). Because the behavior changes, consistency matters: if some shards serve old tokenization while others serve new, queries return mixed results. ALL shards must finish reindexing before ANY shard swaps to the new format.

Unit IDs and shard mapping work the same way as Journey 2 — the Provider defines IDs at creation time and stores the mapping in the task payload. The framework only cares about Unit.NodeID for ownership tracking.

During StartTask, each shard reindexes into new segments but does NOT swap yet. It reports progress and completion, but the old segments remain active for queries.

OnGroupCompleted: fires on each node AFTER all units across all nodes finish (since all units share the default group ""). Receives localGroupUnitIDs — the Provider looks up the shard mapping from task.Payload to know which local shards to swap. Atomically swaps bucket pointers for each local shard. This is a local operation, no Raft needed.

OnTaskCompleted: submits a Raft schema update to change the tokenization config. Because Raft deduplicates, the schema update happens exactly once even though OnTaskCompleted fires on every node.

The barrier guarantee ensures NO shard swaps until ALL shards finish reindexing.

Journey 4: Per-tenant work with per-tenant finalize (MT reindex with groups).

A multi-tenant reindex provider creates one group per tenant. Each tenant's replicas are units in that group. As each tenant's group completes, OnGroupCompleted fires mid-flight — the provider atomically swaps that tenant's bucket pointers without waiting for other tenants. This provides per-tenant atomicity: if tenant A's group completes while tenant B is still reindexing, tenant A starts serving new data immediately.

specs := []UnitSpec{
    {ID: "t1__nodeA", GroupID: "tenant-1"},
    {ID: "t1__nodeB", GroupID: "tenant-1"},
    {ID: "t2__nodeA", GroupID: "tenant-2"},
    ...
}
raft.AddDistributedTaskWithGroups(ctx, "reindex", taskID, payload, specs)

OnGroupCompleted: fires per-tenant as each tenant's replicas all finish. Atomically swaps bucket pointers for the local replicas of that tenant.

OnTaskCompleted: fires once when ALL tenants finish. Updates schema with new tokenization config. If any tenant failed, task.Status == FAILED — provider skips schema update but already-swapped tenants remain valid (independent).

Progress throttling

Unit progress updates go through Raft consensus. To prevent flooding the log, the Scheduler wraps the TaskCompletionRecorder in a ThrottledRecorder that forwards progress for each unit at most once per 30 seconds. Completion and failure calls are never throttled.

Adding a new task type

To add a new kind of distributed task:

  1. Define a namespace constant (e.g. "my-reindex").
  2. Implement Provider (or UnitAwareProvider if you need group-level callbacks).
  3. Register the provider in configure_api.go's MakeAppState, keyed by your namespace.
  4. Create tasks via the Raft endpoint [cluster.Raft.AddDistributedTask], passing unit IDs (at least one unit is always required).

See ShardNoopProvider for a complete working example used by acceptance tests.

Provider idempotency contract

After a node crash, the Scheduler re-launches tasks that still have non-terminal units. The Provider MUST handle re-invocation idempotently:

  • Units in IN_PROGRESS state will be re-delivered to the same node that claimed them. The provider must detect partially-completed work (e.g. via sentinel files) and either resume or restart the unit safely.

  • Units in PENDING state (unclaimed) may be delivered to any node. The provider must tolerate being asked to process a unit that another node is also attempting to claim — only one node's first progress update will succeed. Providers that use per-replica assignment (UnitToNode metadata) avoid this race entirely, since each unit is deterministically assigned to exactly one node at creation time.

  • The framework does NOT re-assign units claimed by a crashed node to other nodes. The crashed node must eventually restart for its IN_PROGRESS units to complete. If a node is permanently lost, the task must be cancelled manually.

Typical idempotency patterns:

  • Check for a completion sentinel file before starting work
  • Use atomic file operations (write-to-temp + rename) for crash safety
  • Store progress checkpoints that allow resuming from the last known good state

Index

Constants

View Source
const ShardNoopProviderNamespace = "shard-noop"

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrencyLimiter added in v1.37.0

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter is a channel-based semaphore that limits the number of concurrent operations. Acquire blocks until a slot is available or the context is cancelled. Release returns a slot to the pool.

func NewConcurrencyLimiter added in v1.37.0

func NewConcurrencyLimiter(maxConcurrency int) *ConcurrencyLimiter

NewConcurrencyLimiter creates a limiter that allows up to maxConcurrency concurrent operations. Values < 1 are clamped to 1.

func (*ConcurrencyLimiter) Acquire added in v1.37.0

func (l *ConcurrencyLimiter) Acquire(ctx context.Context) error

Acquire blocks until a slot is available or ctx is cancelled.

func (*ConcurrencyLimiter) Release added in v1.37.0

func (l *ConcurrencyLimiter) Release()

Release returns a slot to the pool. Must be called exactly once for each successful Acquire.

type ListDistributedTasksResponse

type ListDistributedTasksResponse struct {
	Tasks map[string][]*Task `json:"tasks"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is responsible for managing distributed tasks across the cluster.

func NewManager

func NewManager(params ManagerParameters) *Manager

func (*Manager) AddTask

func (m *Manager) AddTask(c *api.ApplyRequest, seqNum uint64) error

AddTask registers a new distributed task from a Raft apply. The seqNum becomes the task's Version, used to distinguish re-runs of the same task ID. Returns an error if a task with the same namespace/ID is already running, or if no units are provided.

func (*Manager) CancelTask

func (m *Manager) CancelTask(a *api.ApplyRequest) error

CancelTask transitions a running task to CANCELLED. In-flight units are not waited for — the Scheduler will terminate their local handles on the next tick.

func (*Manager) CleanUpTask

func (m *Manager) CleanUpTask(a *api.ApplyRequest) error

CleanUpTask removes a terminal task from the Manager's state. It refuses to clean up tasks that are still running or whose completedTaskTTL has not yet elapsed, preventing premature removal of status information that other nodes may still need to observe.

func (*Manager) ListDistributedTasks

func (m *Manager) ListDistributedTasks(_ context.Context) (map[string][]*Task, error)

ListDistributedTasks returns a snapshot of all tasks grouped by namespace. Each Task is cloned, so callers may read the returned values without holding the Manager's lock.

func (*Manager) ListDistributedTasksPayload

func (m *Manager) ListDistributedTasksPayload(ctx context.Context) ([]byte, error)

func (*Manager) RecordUnitCompletion added in v1.37.0

func (m *Manager) RecordUnitCompletion(c *api.ApplyRequest) error

RecordUnitCompletion handles both success and failure (distinguished by a non-empty error field in the request). On failure, the task transitions to FAILED immediately — remaining in-flight units are NOT waited for, and their subsequent completion reports will be rejected with "task is no longer running". This fail-fast behavior is intentional: it avoids wasting cluster resources on a task that is already doomed.

func (*Manager) Restore

func (m *Manager) Restore(bytes []byte) error

Restore replaces the Manager's in-memory state from a Raft snapshot produced by Manager.Snapshot. It is called during Raft leader election or when a follower installs a snapshot from the leader.

func (*Manager) Snapshot

func (m *Manager) Snapshot() ([]byte, error)

Snapshot serialises the full task state to JSON for Raft snapshotting. The inverse operation is Manager.Restore.

func (*Manager) UpdateUnitProgress added in v1.37.0

func (m *Manager) UpdateUnitProgress(c *api.ApplyRequest) error

UpdateUnitProgress also handles initial node assignment: the first progress update for an unassigned unit sets its NodeID, claiming it for that node. After assignment, updates from other nodes are rejected. Progress updates to terminal units are silently ignored (no error) because in-flight Raft commands may arrive after a unit has already completed.

type ManagerParameters

type ManagerParameters struct {
	Clock clockwork.Clock

	CompletedTaskTTL time.Duration
}

type MockTaskCleaner

type MockTaskCleaner struct {
	mock.Mock
}

MockTaskCleaner is an autogenerated mock type for the TaskCleaner type

func NewMockTaskCleaner

func NewMockTaskCleaner(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCleaner

NewMockTaskCleaner creates a new instance of MockTaskCleaner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCleaner) CleanUpDistributedTask

func (_m *MockTaskCleaner) CleanUpDistributedTask(ctx context.Context, namespace string, taskID string, taskVersion uint64) error

CleanUpDistributedTask provides a mock function with given fields: ctx, namespace, taskID, taskVersion

func (*MockTaskCleaner) EXPECT

type MockTaskCleaner_CleanUpDistributedTask_Call

type MockTaskCleaner_CleanUpDistributedTask_Call struct {
	*mock.Call
}

MockTaskCleaner_CleanUpDistributedTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanUpDistributedTask'

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Return

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Run

func (*MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn

type MockTaskCleaner_Expecter

type MockTaskCleaner_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCleaner_Expecter) CleanUpDistributedTask

func (_e *MockTaskCleaner_Expecter) CleanUpDistributedTask(ctx interface{}, namespace interface{}, taskID interface{}, taskVersion interface{}) *MockTaskCleaner_CleanUpDistributedTask_Call

CleanUpDistributedTask is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • taskVersion uint64

type MockTaskCompletionRecorder

type MockTaskCompletionRecorder struct {
	mock.Mock
}

MockTaskCompletionRecorder is an autogenerated mock type for the TaskCompletionRecorder type

func NewMockTaskCompletionRecorder

func NewMockTaskCompletionRecorder(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCompletionRecorder

NewMockTaskCompletionRecorder creates a new instance of MockTaskCompletionRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCompletionRecorder) EXPECT

func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion added in v1.37.0

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string) error

RecordDistributedTaskUnitCompletion provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID

func (*MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure added in v1.37.0

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, errMsg string) error

RecordDistributedTaskUnitFailure provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, errMsg

func (*MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress added in v1.37.0

func (_m *MockTaskCompletionRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace string, taskID string, version uint64, nodeID string, unitID string, progress float32) error

UpdateDistributedTaskUnitProgress provides a mock function with given fields: ctx, namespace, taskID, version, nodeID, unitID, progress

type MockTaskCompletionRecorder_Expecter

type MockTaskCompletionRecorder_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitCompletion(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call

RecordDistributedTaskUnitCompletion is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskUnitFailure(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, errMsg interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call

RecordDistributedTaskUnitFailure is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string
  • errMsg string

func (*MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress added in v1.37.0

func (_e *MockTaskCompletionRecorder_Expecter) UpdateDistributedTaskUnitProgress(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, nodeID interface{}, unitID interface{}, progress interface{}) *MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call

UpdateDistributedTaskUnitProgress is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • nodeID string
  • unitID string
  • progress float32

type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitCompletion'

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitCompletion_Call) RunAndReturn added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call added in v1.37.0

type MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskUnitFailure'

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_RecordDistributedTaskUnitFailure_Call) RunAndReturn added in v1.37.0

type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call added in v1.37.0

type MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateDistributedTaskUnitProgress'

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Return added in v1.37.0

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) Run added in v1.37.0

func (*MockTaskCompletionRecorder_UpdateDistributedTaskUnitProgress_Call) RunAndReturn added in v1.37.0

type Provider

type Provider interface {
	// SetCompletionRecorder is invoked on node startup to register TaskCompletionRecorder which
	// should be passed to all launch tasks so they could mark their completion.
	SetCompletionRecorder(recorder TaskCompletionRecorder)

	// GetLocalTasks returns a list of tasks that provider is aware of from the local node state.
	GetLocalTasks() []TaskDescriptor

	// CleanupTask is a signal to clean up the task local state.
	CleanupTask(desc TaskDescriptor) error

	// StartTask is a signal to start executing the task in the background.
	StartTask(task *Task) (TaskHandle, error)
}

Provider is an interface for the management and execution of a group of tasks denoted by a namespace.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the component which is responsible for polling the active tasks in the cluster (via the Manager) and making sure that the tasks are running on the local node.

The general flow of a distributed task is as follows: 1. A Provider is registered with the Scheduler at startup to handle all tasks under a specific namespace. 2. A task is created and added to the cluster via the Manager.AddTask. 3. Scheduler regularly scans all available tasks in the cluster, picks up new ones and instructs the Provider to execute them locally. 4. A task is responsible for updating its status in the cluster via TaskCompletionRecorder. 5. Scheduler polls the cluster for the task status and checks if it is still running. It cancels the local task if it is not marked as STARTED anymore. 6. After completed task TTL has passed, the Scheduler issues the Manager.CleanUpDistributedTask request to remove the task from the cluster list. 7. After a task is removed from the cluster list, the Scheduler instructs the Provider to clean up the local task state.

func NewScheduler

func NewScheduler(params SchedulerParams) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close()

Close stops the background tick loop and terminates all running task handles. It blocks until all handles have been signalled. After Close returns, no new ticks will fire.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start wires up providers with a ThrottledRecorder, performs an initial task listing to bootstrap any already-active tasks, and spawns the background tick loop. It is safe to call exactly once. Use Scheduler.Close to stop the loop and terminate all running tasks.

type SchedulerParams

type SchedulerParams struct {
	CompletionRecorder TaskCompletionRecorder
	TasksLister        TasksLister
	TaskCleaner        TaskCleaner
	Providers          map[string]Provider
	Clock              clockwork.Clock
	Logger             logrus.FieldLogger
	MetricsRegisterer  prometheus.Registerer

	LocalNode        string
	CompletedTaskTTL time.Duration
	TickInterval     time.Duration
}

type ShardLister added in v1.37.0

type ShardLister interface {
	GetLocalShardNames(collection string) ([]string, error)
}

ShardLister provides local shard names for a collection, allowing the ShardNoopProvider to determine unit ownership based on real shard topology without importing the db package.

type ShardNoopProvider added in v1.37.0

type ShardNoopProvider struct {
	// contains filtered or unexported fields
}

ShardNoopProvider is a test-only UnitAwareProvider used by acceptance tests to exercise the unit lifecycle end-to-end. It is registered in configure_api.go behind the SHARD_NOOP_PROVIDER_ENABLED env var and exposed via a debug HTTP endpoint on port 6060.

On StartTask, it spawns a goroutine that iterates units sequentially, reports 50% progress, then completes each one. Set FailUnitID in the payload to make one unit fail instead, which triggers the task-level fail-fast behavior.

When a Collection is specified in the payload and a ShardLister is provided, the provider only claims units whose IDs match local shard names. This validates that unit ownership aligns with actual shard placement.

Marker files are written as hidden dot-files inside shard directories (when a collection is specified) or under {dataRoot}/.dtm/ (for synthetic units). This avoids writing to /tmp and keeps side effects scoped to the Weaviate data directory.

func NewShardNoopProvider added in v1.37.0

func NewShardNoopProvider(nodeID string, logger logrus.FieldLogger, shardLister ShardLister, dataRoot string) *ShardNoopProvider

NewShardNoopProvider creates a new ShardNoopProvider. Pass nil for shardLister when real shard topology is not needed (e.g. unit tests with synthetic unit IDs). dataRoot is the Weaviate persistence data path; marker files are written as hidden dot-files inside shard directories (collection-aware mode) or under {dataRoot}/.dtm/ (synthetic mode).

func (*ShardNoopProvider) CleanupTask added in v1.37.0

func (p *ShardNoopProvider) CleanupTask(desc TaskDescriptor) error

func (*ShardNoopProvider) GetFinalizedGroups added in v1.37.0

func (p *ShardNoopProvider) GetFinalizedGroups(desc TaskDescriptor) map[string][]string

GetFinalizedGroups returns the per-group finalized unit IDs for a task.

func (*ShardNoopProvider) GetFinalizedUnits added in v1.37.0

func (p *ShardNoopProvider) GetFinalizedUnits(desc TaskDescriptor) []string

GetFinalizedUnits returns all finalized unit IDs across all groups for a task.

func (*ShardNoopProvider) GetLocalTasks added in v1.37.0

func (p *ShardNoopProvider) GetLocalTasks() []TaskDescriptor

func (*ShardNoopProvider) IsTaskCompleted added in v1.37.0

func (p *ShardNoopProvider) IsTaskCompleted(desc TaskDescriptor) bool

func (*ShardNoopProvider) OnGroupCompleted added in v1.37.0

func (p *ShardNoopProvider) OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string)

func (*ShardNoopProvider) OnTaskCompleted added in v1.37.0

func (p *ShardNoopProvider) OnTaskCompleted(task *Task)

func (*ShardNoopProvider) SetCompletionRecorder added in v1.37.0

func (p *ShardNoopProvider) SetCompletionRecorder(recorder TaskCompletionRecorder)

func (*ShardNoopProvider) StartTask added in v1.37.0

func (p *ShardNoopProvider) StartTask(task *Task) (TaskHandle, error)

type ShardNoopProviderPayload added in v1.37.0

type ShardNoopProviderPayload struct {
	FailUnitID        string            `json:"failUnitId,omitempty"`
	Collection        string            `json:"collection,omitempty"`
	UnitToShard       map[string]string `json:"unitToShard,omitempty"`
	UnitToNode        map[string]string `json:"unitToNode,omitempty"`
	SlowUnitID        string            `json:"slowUnitId,omitempty"`
	SlowUnitDelayMs   int               `json:"slowUnitDelayMs,omitempty"`
	ProcessingDelayMs int               `json:"processingDelayMs,omitempty"`
	MaxConcurrency    int               `json:"maxConcurrency,omitempty"`
}

ShardNoopProviderPayload is the JSON payload for tasks created with the ShardNoopProvider. When Collection is set and a ShardLister is available, the provider uses real shard placement for unit ownership instead of the synthetic NodeID-based assignment.

UnitToShard maps unit IDs to shard names, allowing multiple units per shard (one per replica). UnitToNode maps unit IDs to the node that should process them, providing deterministic ownership when RF > 1 (where multiple nodes have the same shard locally). Both maps are required when Collection is set.

MaxConcurrency controls how many units are processed in parallel on each node. When > 1, processUnits fans out with a ConcurrencyLimiter instead of sequential iteration. Default 0 = sequential (existing behavior).

type Task

type Task struct {
	// Namespace is the namespace of distributed tasks which are managed by different Provider implementations
	Namespace string `json:"namespace"`

	TaskDescriptor `json:",inline"`

	// Payload is arbitrary data that is needed to execute a task of Namespace.
	Payload []byte `json:"payload"`

	// Status is the current status of the task.
	Status TaskStatus `json:"status"`

	// StartedAt is the time that a task was submitted to the cluster.
	StartedAt time.Time `json:"startedAt"`

	// FinishedAt is the time that task reached a terminal status.
	// Additionally, it is used to schedule task clean up.
	FinishedAt time.Time `json:"finishedAt"`

	// Error is an optional field to store the error which moved the task to FAILED status.
	Error string `json:"error,omitempty"`

	// Units tracks per-unit progress. Always non-nil for valid tasks.
	Units map[string]*Unit `json:"units,omitempty"`
}

Task represents a distributed task tracked across the cluster via Raft consensus.

Completion is tracked per-unit. The task finishes when all units reach a terminal state. A single unit failure immediately fails the entire task — remaining in-flight units are NOT waited for.

Units are always required when creating a task.

func (*Task) AllGroupUnitsTerminal added in v1.37.0

func (t *Task) AllGroupUnitsTerminal(groupID string) bool

AllGroupUnitsTerminal returns true if all units in the given group are terminal.

func (*Task) AllUnitsTerminal added in v1.37.0

func (t *Task) AllUnitsTerminal() bool

AllUnitsTerminal returns true if all units are in a terminal state (COMPLETED or FAILED).

func (*Task) AnyUnitFailed added in v1.37.0

func (t *Task) AnyUnitFailed() bool

AnyUnitFailed returns true if any unit has FAILED status.

func (*Task) Clone

func (t *Task) Clone() *Task

func (*Task) Groups added in v1.37.0

func (t *Task) Groups() []string

Groups returns the distinct GroupIDs across all units (includes "" for ungrouped).

func (*Task) LocalGroupUnitIDs added in v1.37.0

func (t *Task) LocalGroupUnitIDs(groupID, nodeID string) []string

LocalGroupUnitIDs returns the IDs of units in the given group assigned to the given node.

func (*Task) LocalUnitIDs added in v1.37.0

func (t *Task) LocalUnitIDs(nodeID string) []string

LocalUnitIDs returns the IDs of units assigned to the given node.

func (*Task) NodeHasNonTerminalUnits added in v1.37.0

func (t *Task) NodeHasNonTerminalUnits(nodeID string) bool

NodeHasNonTerminalUnits returns true if the given node has units that are not yet terminal. Unassigned units (empty NodeID) are considered as belonging to any node.

type TaskCleaner

type TaskCleaner interface {
	CleanUpDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error
}

TaskCleaner is an interface for issuing a request to clean up a distributed task.

type TaskCompletionRecorder

type TaskCompletionRecorder interface {
	RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error
	RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error
	UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error
}

TaskCompletionRecorder is an interface for recording the completion of a distributed task.

type TaskDescriptor

type TaskDescriptor struct {
	// ID is the identifier of the task in the namespace.
	ID string `json:"ID"`

	// Version is the version of the task with task ID.
	// It is used to differentiate between multiple runs of the same task.
	Version uint64 `json:"version"`
}

TaskDescriptor is a struct identifying a task execution under a certain task namespace.

type TaskHandle

type TaskHandle interface {
	// Terminate is a signal to stop executing the task. If the task is no longer running because it already finished,
	// the method call should be a no-op.
	//
	// Terminated task can be started later again, therefore, no local state can be removed.
	Terminate()

	// Done returns a channel that is closed when the task's goroutine exits, whether due to
	// completion, failure, or termination. The scheduler uses this to detect dead handles
	// and allow re-launch of tasks that still have pending work.
	Done() <-chan struct{}
}

TaskHandle is an interface to control a locally running task.

type TaskStatus

type TaskStatus string
const (
	// TaskStatusStarted means that the task is still running on some of the nodes.
	TaskStatusStarted TaskStatus = "STARTED"
	// TaskStatusFinished means that the task was successfully executed by all nodes.
	TaskStatusFinished TaskStatus = "FINISHED"
	// TaskStatusCancelled means that the task was cancelled by user.
	TaskStatusCancelled TaskStatus = "CANCELLED"
	// TaskStatusFailed means that one of the nodes got a non-retryable error and all other nodes
	// terminated the execution.
	TaskStatusFailed TaskStatus = "FAILED"
)

func (TaskStatus) String

func (t TaskStatus) String() string

type TasksLister

type TasksLister interface {
	ListDistributedTasks(ctx context.Context) (map[string][]*Task, error)
}

TasksLister is an interface for listing distributed tasks in the cluster.

type ThrottledRecorder added in v1.37.0

type ThrottledRecorder struct {
	// contains filtered or unexported fields
}

ThrottledRecorder wraps a TaskCompletionRecorder to prevent progress updates from flooding Raft consensus. Each unit's progress is forwarded at most once per interval (default 30s); intermediate updates are silently dropped. Completion and failure calls always pass through immediately — they are never throttled.

Throttle entries are cleaned up when a unit reaches a terminal state (completion or failure), so the internal map does not grow beyond the number of active units.

func NewThrottledRecorder added in v1.37.0

func NewThrottledRecorder(inner TaskCompletionRecorder, interval time.Duration, clock clockwork.Clock) *ThrottledRecorder

func (*ThrottledRecorder) RecordDistributedTaskUnitCompletion added in v1.37.0

func (r *ThrottledRecorder) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error

func (*ThrottledRecorder) RecordDistributedTaskUnitFailure added in v1.37.0

func (r *ThrottledRecorder) RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error

func (*ThrottledRecorder) UpdateDistributedTaskUnitProgress added in v1.37.0

func (r *ThrottledRecorder) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error

type Unit added in v1.37.0

type Unit struct {
	ID         string     `json:"id"`
	GroupID    string     `json:"groupId,omitempty"`
	NodeID     string     `json:"nodeId"`
	Status     UnitStatus `json:"status"`
	Progress   float32    `json:"progress"`
	Error      string     `json:"error,omitempty"`
	UpdatedAt  time.Time  `json:"updatedAt"`
	FinishedAt time.Time  `json:"finishedAt,omitempty"`
}

Unit represents a trackable work unit within a distributed task (e.g. a single shard in a reindex operation). Units follow the lifecycle PENDING → IN_PROGRESS → COMPLETED/FAILED.

NodeID starts empty (unassigned) and is set on the first progress update. The Scheduler treats unassigned units as belonging to any node, which is how initial assignment happens: the first node to report progress claims the unit.

Unit values are owned by the Manager and mutated under its lock. Callers outside the Manager should only access units via cloned Task snapshots from ListDistributedTasks.

type UnitAwareProvider added in v1.37.0

type UnitAwareProvider interface {
	Provider
	OnGroupCompleted(task *Task, groupID string, localGroupUnitIDs []string)
	OnTaskCompleted(task *Task)
}

UnitAwareProvider fires per-group callbacks as groups complete (mid-flight), then a global OnTaskCompleted when the task reaches terminal state.

Every unit task has groups. If no explicit GroupID is set, all units belong to a single implicit default group (""). This means:

  • Tasks without groups: OnGroupCompleted fires once with all local units when all units reach terminal state (same effect as having a single group).
  • Tasks with groups: OnGroupCompleted fires per-group as each completes, even while the task is still STARTED

Callback phases:

  1. OnGroupCompleted — per group, fires as each group's units all reach terminal
  2. OnTaskCompleted — fires once on every node after ALL units terminal

type UnitSpec added in v1.37.0

type UnitSpec struct {
	ID      string
	GroupID string
}

UnitSpec defines a unit with an optional group assignment. Used at task creation time when units need group membership (e.g. one group per tenant for MT reindex).

type UnitStatus added in v1.37.0

type UnitStatus string
const (
	UnitStatusPending    UnitStatus = "PENDING"
	UnitStatusInProgress UnitStatus = "IN_PROGRESS"
	UnitStatusCompleted  UnitStatus = "COMPLETED"
	UnitStatusFailed     UnitStatus = "FAILED"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL