distributedtask

package
v1.34.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: BSD-3-Clause Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ListDistributedTasksResponse

type ListDistributedTasksResponse struct {
	Tasks map[string][]*Task `json:"tasks"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is responsible for managing distributed tasks across the cluster.

func NewManager

func NewManager(params ManagerParameters) *Manager

func (*Manager) AddTask

func (m *Manager) AddTask(c *api.ApplyRequest, seqNum uint64) error

func (*Manager) CancelTask

func (m *Manager) CancelTask(a *api.ApplyRequest) error

func (*Manager) CleanUpTask

func (m *Manager) CleanUpTask(a *api.ApplyRequest) error

func (*Manager) ListDistributedTasks

func (m *Manager) ListDistributedTasks(_ context.Context) (map[string][]*Task, error)

func (*Manager) ListDistributedTasksPayload

func (m *Manager) ListDistributedTasksPayload(ctx context.Context) ([]byte, error)

func (*Manager) RecordNodeCompletion

func (m *Manager) RecordNodeCompletion(c *api.ApplyRequest, numberOfNodesInTheCluster int) error

func (*Manager) Restore

func (m *Manager) Restore(bytes []byte) error

func (*Manager) Snapshot

func (m *Manager) Snapshot() ([]byte, error)

type ManagerParameters

type ManagerParameters struct {
	Clock clockwork.Clock

	CompletedTaskTTL time.Duration
}

type MockTaskCleaner

type MockTaskCleaner struct {
	mock.Mock
}

MockTaskCleaner is an autogenerated mock type for the TaskCleaner type

func NewMockTaskCleaner

func NewMockTaskCleaner(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCleaner

NewMockTaskCleaner creates a new instance of MockTaskCleaner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCleaner) CleanUpDistributedTask

func (_m *MockTaskCleaner) CleanUpDistributedTask(ctx context.Context, namespace string, taskID string, taskVersion uint64) error

CleanUpDistributedTask provides a mock function with given fields: ctx, namespace, taskID, taskVersion

func (*MockTaskCleaner) EXPECT

type MockTaskCleaner_CleanUpDistributedTask_Call

type MockTaskCleaner_CleanUpDistributedTask_Call struct {
	*mock.Call
}

MockTaskCleaner_CleanUpDistributedTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanUpDistributedTask'

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Return

func (*MockTaskCleaner_CleanUpDistributedTask_Call) Run

func (*MockTaskCleaner_CleanUpDistributedTask_Call) RunAndReturn

type MockTaskCleaner_Expecter

type MockTaskCleaner_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCleaner_Expecter) CleanUpDistributedTask

func (_e *MockTaskCleaner_Expecter) CleanUpDistributedTask(ctx interface{}, namespace interface{}, taskID interface{}, taskVersion interface{}) *MockTaskCleaner_CleanUpDistributedTask_Call

CleanUpDistributedTask is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • taskVersion uint64

type MockTaskCompletionRecorder

type MockTaskCompletionRecorder struct {
	mock.Mock
}

MockTaskCompletionRecorder is an autogenerated mock type for the TaskCompletionRecorder type

func NewMockTaskCompletionRecorder

func NewMockTaskCompletionRecorder(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTaskCompletionRecorder

NewMockTaskCompletionRecorder creates a new instance of MockTaskCompletionRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTaskCompletionRecorder) EXPECT

func (*MockTaskCompletionRecorder) RecordDistributedTaskNodeCompletion

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskNodeCompletion(ctx context.Context, namespace string, taskID string, version uint64) error

RecordDistributedTaskNodeCompletion provides a mock function with given fields: ctx, namespace, taskID, version

func (*MockTaskCompletionRecorder) RecordDistributedTaskNodeFailure

func (_m *MockTaskCompletionRecorder) RecordDistributedTaskNodeFailure(ctx context.Context, namespace string, taskID string, version uint64, errMsg string) error

RecordDistributedTaskNodeFailure provides a mock function with given fields: ctx, namespace, taskID, version, errMsg

type MockTaskCompletionRecorder_Expecter

type MockTaskCompletionRecorder_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskNodeCompletion

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskNodeCompletion(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call

RecordDistributedTaskNodeCompletion is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64

func (*MockTaskCompletionRecorder_Expecter) RecordDistributedTaskNodeFailure

func (_e *MockTaskCompletionRecorder_Expecter) RecordDistributedTaskNodeFailure(ctx interface{}, namespace interface{}, taskID interface{}, version interface{}, errMsg interface{}) *MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call

RecordDistributedTaskNodeFailure is a helper method to define mock.On call

  • ctx context.Context
  • namespace string
  • taskID string
  • version uint64
  • errMsg string

type MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call

type MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskNodeCompletion'

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call) Return

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call) Run

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeCompletion_Call) RunAndReturn

type MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call

type MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call struct {
	*mock.Call
}

MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDistributedTaskNodeFailure'

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call) Return

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call) Run

func (*MockTaskCompletionRecorder_RecordDistributedTaskNodeFailure_Call) RunAndReturn

type Provider

type Provider interface {
	// SetCompletionRecorder is invoked on node startup to register TaskCompletionRecorder which
	// should be passed to all launch tasks so they could mark their completion.
	SetCompletionRecorder(recorder TaskCompletionRecorder)

	// GetLocalTasks returns a list of tasks that provider is aware of from the local node state.
	GetLocalTasks() []TaskDescriptor

	// CleanupTask is a signal to clean up the task local state.
	CleanupTask(desc TaskDescriptor) error

	// StartTask is a signal to start executing the task in the background.
	StartTask(task *Task) (TaskHandle, error)
}

Provider is an interface for the management and execution of a group of tasks denoted by a namespace.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the component which is responsible for polling the active tasks in the cluster (via the Manager) and making sure that the tasks are running on the local node.

The general flow of a distributed task is as follows: 1. A Provider is registered with the Scheduler at startup to handle all tasks under a specific namespace. 2. A task is created and added to the cluster via the Manager.AddTask. 3. Scheduler regularly scans all available tasks in the cluster, picks up new ones and instructs the Provider to execute them locally. 4. A task is responsible for updating its status in the cluster via TaskCompletionRecorder. 6. Scheduler polls the cluster for the task status and checks if it is still running. It cancels the local task if it is not marked as STARTED anymore. 7. After completed task TTL has passed, the Scheduler issues the Manager.CleanUpDistributedTask request to remove the task from the cluster list. 8. After a task is removed from the cluster list, the Scheduler instructs the Provider to clean up the local task state.

func NewScheduler

func NewScheduler(params SchedulerParams) *Scheduler

func (*Scheduler) Close

func (s *Scheduler) Close()

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

type SchedulerParams

type SchedulerParams struct {
	CompletionRecorder TaskCompletionRecorder
	TasksLister        TasksLister
	TaskCleaner        TaskCleaner
	Providers          map[string]Provider
	Clock              clockwork.Clock
	Logger             logrus.FieldLogger
	MetricsRegisterer  prometheus.Registerer

	LocalNode        string
	CompletedTaskTTL time.Duration
	TickInterval     time.Duration
}

type Task

type Task struct {
	// Namespace is the namespace of distributed tasks which are managed by different Provider implementations
	Namespace string `json:"namespace"`

	TaskDescriptor `json:",inline"`

	// Payload is arbitrary data that is needed to execute a task of Namespace.
	Payload []byte `json:"payload"`

	// Status is the current status of the task.
	Status TaskStatus `json:"status"`

	// StartedAt is the time that a task was submitted to the cluster.
	StartedAt time.Time `json:"startedAt"`

	// FinishedAt is the time that task reached a terminal status.
	// Additionally, it is used to schedule task clean up.
	FinishedAt time.Time `json:"finishedAt"`

	// Error is an optional field to store the error which moved the task to FAILED status.
	Error string `json:"error,omitempty"`

	// FinishedNodes is a map of nodeIDs that successfully finished the task.
	FinishedNodes map[string]bool `json:"finishedNodes"`
}

func (*Task) Clone

func (t *Task) Clone() *Task

type TaskCleaner

type TaskCleaner interface {
	CleanUpDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error
}

TaskCleaner is an interface for issuing a request to clean up a distributed task.

type TaskCompletionRecorder

type TaskCompletionRecorder interface {
	RecordDistributedTaskNodeCompletion(ctx context.Context, namespace, taskID string, version uint64) error
	RecordDistributedTaskNodeFailure(ctx context.Context, namespace, taskID string, version uint64, errMsg string) error
}

TaskCompletionRecorder is an interface for recording the completion of a distributed task.

type TaskDescriptor

type TaskDescriptor struct {
	// ID is the identifier of the task in the namespace.
	ID string `json:"ID"`

	// Version is the version of the task with task ID.
	// It is used to differentiate between multiple runs of the same task.
	Version uint64 `json:"version"`
}

TaskDescriptor is a struct identifying a task execution under a certain task namespace.

type TaskHandle

type TaskHandle interface {
	// Terminate is a signal to stop executing the task. If the task is no longer running because it already finished,
	// the method call should be a no-op.
	//
	// Terminated task can be started later again, therefore, no local state can be removed.
	Terminate()
}

TaskHandle is an interface to control a locally running task.

type TaskStatus

type TaskStatus string
const (
	// TaskStatusStarted means that the task is still running on some of the nodes.
	TaskStatusStarted TaskStatus = "STARTED"
	// TaskStatusFinished means that the task was successfully executed by all nodes.
	TaskStatusFinished TaskStatus = "FINISHED"
	// TaskStatusCancelled means that the task was cancelled by user.
	TaskStatusCancelled TaskStatus = "CANCELLED"
	// TaskStatusFailed means that one of the nodes got a non-retryable error and all other nodes
	// terminated the execution.
	TaskStatusFailed TaskStatus = "FAILED"
)

func (TaskStatus) String

func (t TaskStatus) String() string

type TasksLister

type TasksLister interface {
	ListDistributedTasks(ctx context.Context) (map[string][]*Task, error)
}

TasksLister is an interface for listing distributed tasks in the cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL