tasks

package
v3.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2021 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub                  *pubsub.PubSub
	ContextJobParametersKey = struct{}{}
)

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient added in v1.4.1

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient added in v1.4.1

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening added in v1.4.1

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop added in v1.4.1

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	jobs.Action
	Task           *Task
	Message        jobs.ActionMessage
	Client         client.Client
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, parentPath string, chainIndex int, cl client.Client, task *Task, action *jobs.Action, message jobs.ActionMessage) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, cl client.Client, task *Task) Runnable

func (*Runnable) CreateChild

func (r *Runnable) CreateChild(parentPath string, chainIndex int, action *jobs.Action, message jobs.ActionMessage) Runnable

CreateChild replicates a runnable for child action

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(parentPath string, input jobs.ActionMessage, aa []*jobs.Action, Queue chan Runnable)

Dispatch gets next runnable from Action and enqueues it to the Queue Todo - Check that done channel is working correctly with chained actions

func (*Runnable) RunAction

func (r *Runnable) RunAction(Queue chan Runnable) error

RunAction creates an action and calls Dispatch

type Subscriber

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context, client client.Client, srv server.Server) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition.

func (*Subscriber) Init

func (s *Subscriber) Init() error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

type Task

type Task struct {
	*jobs.Job
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(ctx context.Context, job *jobs.Job, event interface{}) *Task

NewTaskFromEvent creates a task based on incoming job and event

func (*Task) Add

func (t *Task) Add(delta int)

Add increments task internal retain counter

func (*Task) AppendLog

func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)

AppendLog appends logs from an action to the task OutputChain

func (*Task) Done

func (t *Task) Done(delta int)

Done decrements task internal retain counter

func (*Task) EnqueueRunnables

func (t *Task) EnqueueRunnables(c client.Client, output chan Runnable)

EnqueueRunnables appends chained actions to a running Runnable

func (*Task) GetJobTaskClone

func (t *Task) GetJobTaskClone() *jobs.Task

GetJobTaskClone creates a protobuf clone of this task

func (*Task) GetRunUUID

func (t *Task) GetRunUUID() string

GetRunUUID returns the task internal run UUID

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)

GetRunnableChannels prepares a set of data channels for action actual Run method.

func (*Task) GlobalError added in v1.2.5

func (t *Task) GlobalError(e error)

GlobalError set task in error globally

func (*Task) Save

func (t *Task) Save()

Save publish task to PubSub topic

func (*Task) SetControllable

func (t *Task) SetControllable(canStop bool, canPause bool)

SetControllable flags task as being able to be stopped or paused

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

SetEndTime updates end time

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

SetHasProgress flags task as providing progress information

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

SetProgress updates task internal progress

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

SetStartTime updates start time

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

SetStatus updates task internal status

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan Runnable, requeue chan Runnable, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Source Files

  • dispatcher-worker.go
  • dispatcher.go
  • doc.go
  • reconnecting-client.go
  • runnable.go
  • subscriber.go
  • task.go

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL