tasks

package
v2.1.0-rc1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2020 License: AGPL-3.0 Imports: 29 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub                  *pubsub.PubSub
	ContextJobParametersKey = struct{}{}
)
View Source
var (
	RC int
)

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	JobQueue   chan Runnable
	WorkerPool chan chan Runnable
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient added in v1.4.1

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient added in v1.4.1

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening added in v1.4.1

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop added in v1.4.1

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	jobs.Action
	Task           *Task
	Message        jobs.ActionMessage
	Client         client.Client
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, parentPath string, chainIndex int, cl client.Client, task *Task, action *jobs.Action, message jobs.ActionMessage) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, cl client.Client, task *Task) Runnable

func (*Runnable) CreateChild

func (r *Runnable) CreateChild(parentPath string, chainIndex int, action *jobs.Action, message jobs.ActionMessage) Runnable

CreateChild replicates a runnable for child action

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(parentPath string, input jobs.ActionMessage, actions []*jobs.Action, Queue chan Runnable)

Dispatch gets next runnable from Action and enqueues it to the Queue Todo - Check that done channel is working correctly with chained actions

func (*Runnable) RunAction

func (r *Runnable) RunAction(Queue chan Runnable) error

RunAction creates an action and calls Dispatch

type Subscriber

type Subscriber struct {
	Client          client.Client
	MainQueue       chan Runnable
	UpdateTasksChan chan *jobs.Task

	JobsDefinitions map[string]*jobs.Job
	Dispatchers     map[string]*Dispatcher

	RootContext context.Context
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context, client client.Client, srv server.Server) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispacher, one for each job definition.

func (*Subscriber) GetDispatcherForJob

func (s *Subscriber) GetDispatcherForJob(job *jobs.Job) *Dispatcher

GetDispatcherForJob creates a new dispatcher for a job

func (*Subscriber) Init

func (s *Subscriber) Init() error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) ListenToMainQueue

func (s *Subscriber) ListenToMainQueue()

ListenToMainQueue starts a go routine that listens to the Event Bus

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

func (*Subscriber) TaskChannelSubscription

func (s *Subscriber) TaskChannelSubscription()

TaskChannelSubscription uses PubSub library to receive update messages from tasks

type Task

type Task struct {
	*jobs.Job

	RC      int
	RunUUID string
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(ctx context.Context, job *jobs.Job, event interface{}) *Task

func (*Task) Add

func (t *Task) Add(delta int)

func (*Task) AppendLog

func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)

func (*Task) Done

func (t *Task) Done(delta int)

func (*Task) EnqueueRunnables

func (t *Task) EnqueueRunnables(c client.Client, output chan Runnable)

func (*Task) GetJobTaskClone

func (t *Task) GetJobTaskClone() *jobs.Task

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)

func (*Task) GlobalError added in v1.2.5

func (t *Task) GlobalError(e error)

func (*Task) Save

func (t *Task) Save()

func (*Task) SetControllable

func (t *Task) SetControllable(canStop bool, canPause bool)

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

type Worker

type Worker struct {
	WorkerPool chan chan Runnable
	JobChannel chan Runnable

	JobReQueue chan Runnable
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan Runnable, requeue chan Runnable, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Source Files

  • dispatcher-worker.go
  • dispatcher.go
  • doc.go
  • reconnecting-client.go
  • runnable.go
  • subscriber.go
  • task.go

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL