parade

package
v0.21.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2020 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidToken       = errors.New("performance token invalid (action may have exceeded deadline)")
	ErrBadStatus          = errors.New("bad status for task")
	ErrNoNotifyChannel    = errors.New("task has no notify_channel_after")
	ErrServiceUnavailable = errors.New("service unavailable")
)
View Source
var ErrBadTypeConversion = errors.New("bad type")
View Source
var ErrNoMoreData = errors.New("no more data")
View Source
var TaskDataColumnNames = []string{
	"id", "action", "body", "status", "status_code", "num_tries", "max_tries",
	"num_signals", "total_dependencies",
	"actor_id", "action_deadline", "performance_token",
	"to_signal_after", "notify_channel_after",
}

TaskDataColumnNames holds the names of columns in tasks as they appear in the database.

Functions

func DeleteTasks

func DeleteTasks(ctx context.Context, tx pgx.Tx, taskIds []TaskID) error

DeleteTasks deletes taskIds, removing dependencies and deleting (effectively recursively) any tasks that are left with no dependencies. It creates a temporary table on tx, so ideally close the transaction shortly after. The effect is easiest to analyze when all deleted tasks have been either completed or been aborted.

func ExtendTaskDeadline

func ExtendTaskDeadline(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, maxDuration time.Duration) error

ExtendTaskDeadline extends the deadline for completing taskID which was acquired with the specified token, for maxDuration longer. It returns nil if the task is still owned and its deadline was extended, or an SQL error, or ErrInvalidToken.

func InsertTasks

func InsertTasks(ctx context.Context, conn *pgxpool.Conn, source pgx.CopyFromSource) error

func ReturnTask

func ReturnTask(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error

ReturnTask returns taskId which was acquired using the specified performanceToken, giving it resultStatus and resultStatusCode. It returns ErrInvalidToken if the performanceToken is invalid; this happens when ReturnTask is called after its deadline expires, or due to a logic error.

Types

type ActionManager added in v0.15.0

type ActionManager struct {
	// contains filtered or unexported fields
}

ActionManager manages the process of requesting and returning tasks for a specific Actor. The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it.

func NewActionManager added in v0.15.0

func NewActionManager(actor Actor, parade Parade, properties *ManagerProperties) *ActionManager

NewActionManager initiates an ActionManager with workers and returns a

func (*ActionManager) Close added in v0.15.0

func (a *ActionManager) Close()

type Actor added in v0.15.0

type Actor interface {
	// Handle performs actions with the given body and return the ActorResult
	Handle(action string, body *string, signalledErrors int) ActorResult
	// Actions returns the list of actions that could be performed by the Actor
	Actions() []string
	// ActorID returns the ID of the actor
	ActorID() ActorID
}

Actor handles an action or a group of actions

type ActorID

type ActorID string

type ActorResult added in v0.15.0

type ActorResult struct {
	Status     string
	StatusCode TaskStatusCodeValue
}

type ManagerProperties added in v0.15.0

type ManagerProperties struct {
	Workers     int            // number of goroutines handling tasks
	ChannelSize int            // size of the channel containing tasks for workers
	MaxTasks    int            // max tasks requested in every ownTasks request
	WaitTime    *time.Duration // time to wait if OwnTasks returned no tasks.
	ErrWaitTime *time.Duration // time to wait if OwnTasks returned err.
	MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks
}

ManagerProperties defines the configuration properties of an ActionManager

type OwnedTaskData

type OwnedTaskData struct {
	ID                   TaskID           `db:"task_id"`
	Token                PerformanceToken `db:"token"`
	NumSignalledFailures int              `db:"num_failures"`
	Action               string           `db:"action"`
	Body                 *string
}

OwnedTaskData is a row returned from "SELECT * FROM own_tasks(...)".

func OwnTasks

func OwnTasks(conn pgxscan.Querier, actor ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)

OwnTasks owns for actor and returns up to maxTasks tasks for performing any of actions.

type Parade

type Parade interface {
	// InsertTasks adds tasks efficiently
	InsertTasks(ctx context.Context, tasks []TaskData) error

	// OwnTasks owns and returns up to maxTasks tasks for actor for performing any of
	// actions.  It will return tasks and for another OwnTasks call to acquire them after
	// maxDuration (if specified).
	OwnTasks(actor ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)

	// ExtendTaskDeadline extends the deadline for completing taskID which was acquired with
	// the specified token, for maxDuration longer.  It returns nil if the task is still
	// owned and its deadline was extended, or an SQL error, or ErrInvalidToken.  deadline
	// was extended.
	ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error

	// ReturnTask returns taskID which was acquired using the specified performanceToken,
	// giving it resultStatus and resultStatusCode.  It returns ErrInvalidToken if the
	// performanceToken is invalid; this happens when ReturnTask is called after its
	// deadline expires, or due to a logic error.
	ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error

	// NewWaiter returns TaskWaiter to wait for id on conn.  conn is owned by the returned
	// TaskWaiter until the waiter is done or cancelled.
	NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)

	// DeleteTasks deletes taskIDs, removing dependencies and deleting (effectively
	// recursively) any tasks that are left with no dependencies.  It creates a temporary
	// table on tx, so ideally close the transaction shortly after.  The effect is easiest
	// to analyze when all deleted tasks have been either completed or been aborted.
	DeleteTasks(ctx context.Context, taskIDs []TaskID) error
}

func NewParadeDB

func NewParadeDB(pool *pgxpool.Pool) Parade

NewParadeDB returns a Parade that implements Parade on a database. The DDL should already be installed on that database.

type ParadeDB

type ParadeDB pgxpool.Pool

func (*ParadeDB) DeleteTasks

func (p *ParadeDB) DeleteTasks(ctx context.Context, taskIDs []TaskID) error

func (*ParadeDB) ExtendTaskDeadline

func (p *ParadeDB) ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error

func (*ParadeDB) InsertTasks

func (p *ParadeDB) InsertTasks(ctx context.Context, tasks []TaskData) error

func (*ParadeDB) NewWaiter

func (p *ParadeDB) NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)

func (*ParadeDB) OwnTasks

func (p *ParadeDB) OwnTasks(actor ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)

func (*ParadeDB) PgxPool added in v0.16.0

func (p *ParadeDB) PgxPool() *pgxpool.Pool

func (*ParadeDB) ReturnTask

func (p *ParadeDB) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error

type ParadePrefix

type ParadePrefix struct {
	Base   Parade
	Prefix string
}

ParadePrefix wraps a Parade and adds a prefix to all TaskIDs, action names, and ActorIDs.

func (*ParadePrefix) AddPrefix

func (pp *ParadePrefix) AddPrefix(s string) string

func (*ParadePrefix) AddPrefixActor

func (pp *ParadePrefix) AddPrefixActor(actor ActorID) ActorID

func (*ParadePrefix) AddPrefixTask

func (pp *ParadePrefix) AddPrefixTask(id TaskID) TaskID

func (*ParadePrefix) DeleteTasks

func (pp *ParadePrefix) DeleteTasks(ctx context.Context, ids []TaskID) error

func (*ParadePrefix) ExtendTaskDeadline

func (pp *ParadePrefix) ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error

func (*ParadePrefix) InsertTasks

func (pp *ParadePrefix) InsertTasks(ctx context.Context, tasks []TaskData) error

func (*ParadePrefix) NewWaiter

func (pp *ParadePrefix) NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)

func (*ParadePrefix) OwnTasks

func (pp *ParadePrefix) OwnTasks(actorID ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)

func (*ParadePrefix) ReturnTask

func (pp *ParadePrefix) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error

func (*ParadePrefix) StripPrefix

func (pp *ParadePrefix) StripPrefix(s string) string

func (*ParadePrefix) StripPrefixActor

func (pp *ParadePrefix) StripPrefixActor(actor TaskID) ActorID

func (*ParadePrefix) StripPrefixTask

func (pp *ParadePrefix) StripPrefixTask(id TaskID) TaskID

type PerformanceToken

type PerformanceToken pgtype.UUID

func (*PerformanceToken) Scan

func (dst *PerformanceToken) Scan(src interface{}) error

nolint: stylecheck

func (PerformanceToken) String

func (src PerformanceToken) String() string

func (PerformanceToken) Value

func (src PerformanceToken) Value() (driver.Value, error)

type TaskDBWaiter

type TaskDBWaiter struct {
	// contains filtered or unexported fields
}

TaskWaiter is used to wait for tasks. It is an object not a function to prevent race conditions by starting to wait before beginning to act. It owns a connection to the database and should not be copied.

func NewWaiter

func NewWaiter(ctx context.Context, conn *pgxpool.Conn, taskID TaskID) (*TaskDBWaiter, error)

NewWaiter returns TaskWaiter to wait for id on conn. conn is owned by the returned TaskWaiter until the waiter is done or cancelled.

func (*TaskDBWaiter) Cancel

func (tw *TaskDBWaiter) Cancel()

Cancel cancels waiting.

func (*TaskDBWaiter) Wait

Wait waits for the task to finish or the waiter to be cancelled and returns the task status and status code. It may safely be called from multiple goroutines.

type TaskData

type TaskData struct {
	ID     TaskID `db:"task_id"`
	Action string `db:"action"`
	// Body is JSON-formatted
	Body               *string             `db:"body"`
	Status             *string             `db:"status"`
	StatusCode         TaskStatusCodeValue `db:"status_code"`
	NumTries           int                 `db:"num_tries"`
	NumFailures        int                 `db:"num_failures"`
	MaxTries           *int                `db:"max_tries"`
	NumSignals         int                 `db:"num_signals"` // Internal; set (only) in tests
	TotalDependencies  *int                `db:"total_dependencies"`
	ActorID            ActorID             `db:"actor_id"`
	ActionDeadline     *time.Time          `db:"action_deadline"`
	PerformanceToken   *PerformanceToken   `db:"performance_token"`
	ToSignalAfter      []TaskID            `db:"to_signal_after"`
	NotifyChannelAfter *string             `db:"notify_channel_after"`
}

TaskData is a row in table "tasks". It describes a task to perform.

type TaskDataIterator

type TaskDataIterator struct {
	Data []TaskData
	// contains filtered or unexported fields
}

TaskDataIterator implements the pgx.CopyFromSource interface and allows using CopyFrom to insert multiple TaskData rapidly.

func (*TaskDataIterator) Err

func (td *TaskDataIterator) Err() error

func (*TaskDataIterator) Next

func (td *TaskDataIterator) Next() bool

func (*TaskDataIterator) Values

func (td *TaskDataIterator) Values() ([]interface{}, error)

type TaskID

type TaskID string

type TaskStatusCodeValue

type TaskStatusCodeValue string
const (
	// TaskPending indicates a task is waiting for an actor to perform it (new or being
	// retried)
	TaskPending TaskStatusCodeValue = "pending"
	// TaskInProgress indicates a task is being performed by an actor.
	TaskInProgress TaskStatusCodeValue = "in-progress"
	// TaskAborted indicates an actor has aborted this task with message, will not be reissued
	TaskAborted TaskStatusCodeValue = "aborted"
	// TaskCompleted indicates an actor has completed this task with message, will not reissued
	TaskCompleted TaskStatusCodeValue = "completed"
	// TaskInvalid is used by the API to report errors
	TaskInvalid TaskStatusCodeValue = "[invalid]"
)

func (*TaskStatusCodeValue) Scan

func (dst *TaskStatusCodeValue) Scan(src interface{}) error

nolint: stylecheck

func (TaskStatusCodeValue) Value

func (src TaskStatusCodeValue) Value() (driver.Value, error)

type Waiter

type Waiter interface {
	// Wait waits for the task to finish or the waiter to be cancelled and returns the task
	// status and status code.  It may safely be called from multiple goroutines.
	Wait() (string, TaskStatusCodeValue, error)

	// Cancel cancels waiting.
	Cancel()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL