Documentation
¶
Index ¶
- Variables
- func DeleteTasks(ctx context.Context, tx pgx.Tx, taskIds []TaskID) error
- func ExtendTaskDeadline(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, ...) error
- func InsertTasks(ctx context.Context, conn *pgxpool.Conn, source pgx.CopyFromSource) error
- func ReturnTask(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, resultStatus string, ...) error
- type ActionManager
- type Actor
- type ActorID
- type ActorResult
- type ManagerProperties
- type OwnedTaskData
- type Parade
- type ParadeDB
- func (p *ParadeDB) DeleteTasks(ctx context.Context, taskIDs []TaskID) error
- func (p *ParadeDB) ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error
- func (p *ParadeDB) InsertTasks(ctx context.Context, tasks []TaskData) error
- func (p *ParadeDB) NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)
- func (p *ParadeDB) OwnTasks(actor ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)
- func (p *ParadeDB) PgxPool() *pgxpool.Pool
- func (p *ParadeDB) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, ...) error
- type ParadePrefix
- func (pp *ParadePrefix) AddPrefix(s string) string
- func (pp *ParadePrefix) AddPrefixActor(actor ActorID) ActorID
- func (pp *ParadePrefix) AddPrefixTask(id TaskID) TaskID
- func (pp *ParadePrefix) DeleteTasks(ctx context.Context, ids []TaskID) error
- func (pp *ParadePrefix) ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error
- func (pp *ParadePrefix) InsertTasks(ctx context.Context, tasks []TaskData) error
- func (pp *ParadePrefix) NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)
- func (pp *ParadePrefix) OwnTasks(actorID ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)
- func (pp *ParadePrefix) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, ...) error
- func (pp *ParadePrefix) StripPrefix(s string) string
- func (pp *ParadePrefix) StripPrefixActor(actor TaskID) ActorID
- func (pp *ParadePrefix) StripPrefixTask(id TaskID) TaskID
- type PerformanceToken
- type TaskDBWaiter
- type TaskData
- type TaskDataIterator
- type TaskID
- type TaskStatusCodeValue
- type Waiter
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidToken = errors.New("performance token invalid (action may have exceeded deadline)") ErrBadStatus = errors.New("bad status for task") ErrNoNotifyChannel = errors.New("task has no notify_channel_after") )
var ErrBadTypeConversion = errors.New("bad type")
var ErrNoMoreData = errors.New("no more data")
var TaskDataColumnNames = []string{
"id", "action", "body", "status", "status_code", "num_tries", "max_tries",
"num_signals", "total_dependencies",
"actor_id", "action_deadline", "performance_token",
"to_signal_after", "notify_channel_after",
}
TaskDataColumnNames holds the names of columns in tasks as they appear in the database.
Functions ¶
func DeleteTasks ¶
DeleteTasks deletes taskIds, removing dependencies and deleting (effectively recursively) any tasks that are left with no dependencies. It creates a temporary table on tx, so ideally close the transaction shortly after. The effect is easiest to analyze when all deleted tasks have been either completed or been aborted.
func ExtendTaskDeadline ¶
func ExtendTaskDeadline(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, maxDuration time.Duration) error
ExtendTaskDeadline extends the deadline for completing taskID which was acquired with the specified token, for maxDuration longer. It returns nil if the task is still owned and its deadline was extended, or an SQL error, or ErrInvalidToken.
func InsertTasks ¶
func ReturnTask ¶
func ReturnTask(conn *pgxpool.Conn, taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error
ReturnTask returns taskId which was acquired using the specified performanceToken, giving it resultStatus and resultStatusCode. It returns ErrInvalidToken if the performanceToken is invalid; this happens when ReturnTask is called after its deadline expires, or due to a logic error.
Types ¶
type ActionManager ¶ added in v0.15.0
type ActionManager struct {
// contains filtered or unexported fields
}
ActionManager manages the process of requesting and returning tasks for a specific Actor. The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it.
func NewActionManager ¶ added in v0.15.0
func NewActionManager(actor Actor, parade Parade, properties *ManagerProperties) *ActionManager
NewActionManager initiates an ActionManager with workers and returns a
func (*ActionManager) Close ¶ added in v0.15.0
func (a *ActionManager) Close()
type Actor ¶ added in v0.15.0
type Actor interface {
// Handle performs actions with the given body and return the ActorResult
Handle(action string, body *string, signalledErrors int) ActorResult
// Actions returns the list of actions that could be performed by the Actor
Actions() []string
// ActorID returns the ID of the actor
ActorID() ActorID
}
Actor handles an action or a group of actions
type ActorResult ¶ added in v0.15.0
type ActorResult struct {
Status string
StatusCode TaskStatusCodeValue
}
type ManagerProperties ¶ added in v0.15.0
type ManagerProperties struct {
Workers int // number of goroutines handling tasks
ChannelSize int // size of the channel containing tasks for workers
MaxTasks int // max tasks requested in every ownTasks request
WaitTime *time.Duration // time to wait if OwnTasks returned no tasks.
ErrWaitTime *time.Duration // time to wait if OwnTasks returned err.
MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks
}
ManagerProperties defines the configuration properties of an ActionManager
type OwnedTaskData ¶
type OwnedTaskData struct {
ID TaskID `db:"task_id"`
Token PerformanceToken `db:"token"`
NumSignalledFailures int `db:"num_failures"`
Action string `db:"action"`
Body *string
}
OwnedTaskData is a row returned from "SELECT * FROM own_tasks(...)".
type Parade ¶
type Parade interface {
// InsertTasks adds tasks efficiently
InsertTasks(ctx context.Context, tasks []TaskData) error
// OwnTasks owns and returns up to maxTasks tasks for actor for performing any of
// actions. It will return tasks and for another OwnTasks call to acquire them after
// maxDuration (if specified).
OwnTasks(actor ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)
// ExtendTaskDeadline extends the deadline for completing taskID which was acquired with
// the specified token, for maxDuration longer. It returns nil if the task is still
// owned and its deadline was extended, or an SQL error, or ErrInvalidToken. deadline
// was extended.
ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error
// ReturnTask returns taskID which was acquired using the specified performanceToken,
// giving it resultStatus and resultStatusCode. It returns ErrInvalidToken if the
// performanceToken is invalid; this happens when ReturnTask is called after its
// deadline expires, or due to a logic error.
ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error
// NewWaiter returns TaskWaiter to wait for id on conn. conn is owned by the returned
// TaskWaiter until the waiter is done or cancelled.
NewWaiter(ctx context.Context, taskID TaskID) (Waiter, error)
// DeleteTasks deletes taskIDs, removing dependencies and deleting (effectively
// recursively) any tasks that are left with no dependencies. It creates a temporary
// table on tx, so ideally close the transaction shortly after. The effect is easiest
// to analyze when all deleted tasks have been either completed or been aborted.
DeleteTasks(ctx context.Context, taskIDs []TaskID) error
}
func NewParadeDB ¶
NewParadeDB returns a Parade that implements Parade on a database. The DDL should already be installed on that database.
type ParadeDB ¶
func (*ParadeDB) DeleteTasks ¶
func (*ParadeDB) ExtendTaskDeadline ¶
func (*ParadeDB) InsertTasks ¶
func (*ParadeDB) ReturnTask ¶
func (p *ParadeDB) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error
type ParadePrefix ¶
ParadePrefix wraps a Parade and adds a prefix to all TaskIDs, action names, and ActorIDs.
func (*ParadePrefix) AddPrefix ¶
func (pp *ParadePrefix) AddPrefix(s string) string
func (*ParadePrefix) AddPrefixActor ¶
func (pp *ParadePrefix) AddPrefixActor(actor ActorID) ActorID
func (*ParadePrefix) AddPrefixTask ¶
func (pp *ParadePrefix) AddPrefixTask(id TaskID) TaskID
func (*ParadePrefix) DeleteTasks ¶
func (pp *ParadePrefix) DeleteTasks(ctx context.Context, ids []TaskID) error
func (*ParadePrefix) ExtendTaskDeadline ¶
func (pp *ParadePrefix) ExtendTaskDeadline(taskID TaskID, token PerformanceToken, maxDuration time.Duration) error
func (*ParadePrefix) InsertTasks ¶
func (pp *ParadePrefix) InsertTasks(ctx context.Context, tasks []TaskData) error
func (*ParadePrefix) OwnTasks ¶
func (pp *ParadePrefix) OwnTasks(actorID ActorID, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)
func (*ParadePrefix) ReturnTask ¶
func (pp *ParadePrefix) ReturnTask(taskID TaskID, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error
func (*ParadePrefix) StripPrefix ¶
func (pp *ParadePrefix) StripPrefix(s string) string
func (*ParadePrefix) StripPrefixActor ¶
func (pp *ParadePrefix) StripPrefixActor(actor TaskID) ActorID
func (*ParadePrefix) StripPrefixTask ¶
func (pp *ParadePrefix) StripPrefixTask(id TaskID) TaskID
type PerformanceToken ¶
func (*PerformanceToken) Scan ¶
func (dst *PerformanceToken) Scan(src interface{}) error
nolint: stylecheck
func (PerformanceToken) String ¶
func (src PerformanceToken) String() string
type TaskDBWaiter ¶
type TaskDBWaiter struct {
// contains filtered or unexported fields
}
TaskWaiter is used to wait for tasks. It is an object not a function to prevent race conditions by starting to wait before beginning to act. It owns a connection to the database and should not be copied.
func NewWaiter ¶
NewWaiter returns TaskWaiter to wait for id on conn. conn is owned by the returned TaskWaiter until the waiter is done or cancelled.
func (*TaskDBWaiter) Wait ¶
func (tw *TaskDBWaiter) Wait() (string, TaskStatusCodeValue, error)
Wait waits for the task to finish or the waiter to be cancelled and returns the task status and status code. It may safely be called from multiple goroutines.
type TaskData ¶
type TaskData struct {
ID TaskID `db:"task_id"`
Action string `db:"action"`
// Body is JSON-formatted
Body *string `db:"body"`
Status *string `db:"status"`
StatusCode TaskStatusCodeValue `db:"status_code"`
NumTries int `db:"num_tries"`
NumFailures int `db:"num_failures"`
MaxTries *int `db:"max_tries"`
NumSignals int `db:"num_signals"` // Internal; set (only) in tests
TotalDependencies *int `db:"total_dependencies"`
ActorID ActorID `db:"actor_id"`
ActionDeadline *time.Time `db:"action_deadline"`
PerformanceToken *PerformanceToken `db:"performance_token"`
ToSignalAfter []TaskID `db:"to_signal_after"`
NotifyChannelAfter *string `db:"notify_channel_after"`
}
TaskData is a row in table "tasks". It describes a task to perform.
type TaskDataIterator ¶
type TaskDataIterator struct {
Data []TaskData
// contains filtered or unexported fields
}
TaskDataIterator implements the pgx.CopyFromSource interface and allows using CopyFrom to insert multiple TaskData rapidly.
func (*TaskDataIterator) Err ¶
func (td *TaskDataIterator) Err() error
func (*TaskDataIterator) Next ¶
func (td *TaskDataIterator) Next() bool
func (*TaskDataIterator) Values ¶
func (td *TaskDataIterator) Values() ([]interface{}, error)
type TaskStatusCodeValue ¶
type TaskStatusCodeValue string
const ( // TaskPending indicates a task is waiting for an actor to perform it (new or being // retried) TaskPending TaskStatusCodeValue = "pending" // TaskInProgress indicates a task is being performed by an actor. TaskInProgress TaskStatusCodeValue = "in-progress" // TaskAborted indicates an actor has aborted this task with message, will not be reissued TaskAborted TaskStatusCodeValue = "aborted" // TaskCompleted indicates an actor has completed this task with message, will not reissued TaskCompleted TaskStatusCodeValue = "completed" // TaskInvalid is used by the API to report errors TaskInvalid TaskStatusCodeValue = "[invalid]" )
func (*TaskStatusCodeValue) Scan ¶
func (dst *TaskStatusCodeValue) Scan(src interface{}) error
nolint: stylecheck
type Waiter ¶
type Waiter interface {
// Wait waits for the task to finish or the waiter to be cancelled and returns the task
// status and status code. It may safely be called from multiple goroutines.
Wait() (string, TaskStatusCodeValue, error)
// Cancel cancels waiting.
Cancel()
}