Documentation
¶
Index ¶
- Constants
- Variables
- func ErrFluxParseError(err error) *errors.Error
- func ErrInternalTaskServiceError(err error) *errors.Error
- func ErrQueryError(err error) *errors.Error
- func ErrResultIteratorError(err error) *errors.Error
- func ErrRunExecutionError(err error) *errors.Error
- func ErrTaskConcurrencyLimitReached(runsInFront int) *errors.Error
- func ErrTaskOptionParse(err error) *errors.Error
- func ErrTaskTimeParse(err error) *errors.Error
- func ErrUnexpectedTaskBucketErr(err error) *errors.Error
- type Log
- type LogFilter
- type RequestStillQueuedError
- type Run
- type RunFilter
- type RunStatus
- type Task
- type TaskCreate
- type TaskFilter
- type TaskService
- type TaskStatus
- type TaskUpdate
Constants ¶
const ( TaskDefaultPageSize = 100 TaskMaxPageSize = 500 TaskStatusActive = "active" TaskStatusInactive = "inactive" )
Variables ¶
var ( // TaskSystemType is the type set in tasks' for all crud requests TaskSystemType = "system" // TaskBasicType is short-hand used by the UI to request a minimal subset of system task metadata TaskBasicType = "basic" )
var ( // ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally. ErrRunCanceled = &errors.Error{ Code: errors.EInternal, Msg: "run canceled", } // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. ErrTaskNotClaimed = &errors.Error{ Code: errors.EConflict, Msg: "task not claimed", } // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. ErrTaskAlreadyClaimed = &errors.Error{ Code: errors.EConflict, Msg: "task already claimed", } // ErrNoRunsFound is returned when searching for a range of runs, but none are found. ErrNoRunsFound = &errors.Error{ Code: errors.ENotFound, Msg: "no matching runs found", } // ErrInvalidTaskID error object for bad id's ErrInvalidTaskID = &errors.Error{ Code: errors.EInvalid, Msg: "invalid id", } // ErrTaskNotFound indicates no task could be found for given parameters. ErrTaskNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "task not found", } // ErrRunNotFound is returned when searching for a single run that doesn't exist. ErrRunNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "run not found", } ErrRunKeyNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "run key not found", } ErrPageSizeTooSmall = &errors.Error{ Msg: "cannot have negative page limit", Code: errors.EInvalid, } // ErrPageSizeTooLarge indicates the page size is too large. This error is only // used in the kv task service implementation. The name of this error may lead it // to be used in a place that is not useful. The TaskMaxPageSize is the only one // at 500, the rest at 100. This would likely benefit from a more specific name // since those limits aren't shared globally. ErrPageSizeTooLarge = &errors.Error{ Msg: fmt.Sprintf("cannot use page size larger then %d", TaskMaxPageSize), Code: errors.EInvalid, } ErrOrgNotFound = &errors.Error{ Msg: "organization not found", Code: errors.ENotFound, } ErrTaskRunAlreadyQueued = &errors.Error{ Msg: "run already queued", Code: errors.EConflict, } // ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit. ErrOutOfBoundsLimit = &errors.Error{ Code: errors.EUnprocessableEntity, Msg: "run limit is out of bounds, must be between 1 and 500", } // ErrInvalidOwnerID is called when trying to create a task with out a valid ownerID ErrInvalidOwnerID = &errors.Error{ Code: errors.EInvalid, Msg: "cannot create task with invalid ownerID", } )
Functions ¶
func ErrFluxParseError ¶
ErrFluxParseError is returned when an error is thrown by Flux.Parse in the task executor
func ErrQueryError ¶
ErrQueryError is returned when an error is thrown by Query service in the task executor
func ErrResultIteratorError ¶
ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor
func ErrRunExecutionError ¶
func ErrTaskOptionParse ¶
func ErrTaskTimeParse ¶
ErrTaskTimeParse an error for time parsing errors
func ErrUnexpectedTaskBucketErr ¶
ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket
Types ¶
type Log ¶
type Log struct {
RunID platform.ID `json:"runID,omitempty"`
Time string `json:"time"`
Message string `json:"message"`
}
Log represents a link to a log resource
type LogFilter ¶
type LogFilter struct {
// Task ID is required.
Task platform.ID
// The optional Run ID limits logs to a single run.
Run *platform.ID
}
LogFilter represents a set of filters that restrict the returned log results.
type RequestStillQueuedError ¶
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
func ParseRequestStillQueuedError ¶
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError
ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func (RequestStillQueuedError) Error ¶
func (e RequestStillQueuedError) Error() string
type Run ¶
type Run struct {
ID platform.ID `json:"id,omitempty"`
TaskID platform.ID `json:"taskID"`
Status string `json:"status"`
ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query
RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset
Flux string `json:"flux"` // Flux used in run
StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task
FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task
RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task
Log []Log `json:"log,omitempty"`
TraceID string `json:"traceID"` // TraceID preserves the trace id
IsSampled bool `json:"isSampled"` // IsSampled preserves whether this run was sampled
}
Run is a record createId when a run of a task is scheduled.
type RunFilter ¶
type RunFilter struct {
// Task ID is required for listing runs.
Task platform.ID
After *platform.ID
Limit int
AfterTime string
BeforeTime string
}
RunFilter represents a set of filters that restrict the returned results
type Task ¶
type Task struct {
ID platform.ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID platform.ID `json:"orgID"`
Organization string `json:"org"`
OwnerID platform.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Offset time.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LatestSuccess time.Time `json:"latestSuccess,omitempty"`
LatestFailure time.Time `json:"latestFailure,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Task is a task. 🎊
func (*Task) EffectiveCron ¶
EffectiveCron returns the effective cron string of the options. If the cron option was specified, it is returned. If the every option was specified, it is converted into a cron string using "@every". Otherwise, the empty string is returned. The value of the offset option is not considered.
type TaskCreate ¶
type TaskCreate struct {
Type string `json:"type,omitempty"`
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
OrganizationID platform.ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"`
OwnerID platform.ID `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
}
TaskCreate is the set of values to create a task.
func (TaskCreate) Validate ¶
func (t TaskCreate) Validate() error
type TaskFilter ¶
type TaskFilter struct {
Type *string
Name *string
After *platform.ID
OrganizationID *platform.ID
Organization string
User *platform.ID
Limit int
Status *string
}
TaskFilter represents a set of filters that restrict the returned results
func (TaskFilter) QueryParams ¶
func (f TaskFilter) QueryParams() map[string][]string
QueryParams Converts TaskFilter fields to url query params.
type TaskService ¶
type TaskService interface {
// FindTaskByID returns a single task
FindTaskByID(ctx context.Context, id platform.ID) (*Task, error)
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
FindTasks(ctx context.Context, filter TaskFilter) ([]*Task, int, error)
// CreateTask creates a new task.
// The owner of the task is inferred from the authorizer associated with ctx.
CreateTask(ctx context.Context, t TaskCreate) (*Task, error)
// UpdateTask updates a single task with changeset.
UpdateTask(ctx context.Context, id platform.ID, upd TaskUpdate) (*Task, error)
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
DeleteTask(ctx context.Context, id platform.ID) error
// FindLogs returns logs for a run.
FindLogs(ctx context.Context, filter LogFilter) ([]*Log, int, error)
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error)
// FindRunByID returns a single run.
FindRunByID(ctx context.Context, taskID, runID platform.ID) (*Run, error)
// CancelRun cancels a currently running run.
CancelRun(ctx context.Context, taskID, runID platform.ID) error
// RetryRun creates and returns a new run (which is a retry of another run).
RetryRun(ctx context.Context, taskID, runID platform.ID) (*Run, error)
// ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible.
// The value of scheduledFor may or may not align with the task's schedule.
ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*Run, error)
}
TaskService represents a service for managing one-off and recurring tasks.
type TaskStatus ¶
type TaskStatus string
const ( TaskActive TaskStatus = "active" TaskInactive TaskStatus = "inactive" DefaultTaskStatus TaskStatus = TaskActive )
type TaskUpdate ¶
type TaskUpdate struct {
Flux *string `json:"flux,omitempty"`
Status *string `json:"status,omitempty"`
Description *string `json:"description,omitempty"`
// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *time.Time `json:"-"`
LatestScheduled *time.Time `json:"-"`
LatestSuccess *time.Time `json:"-"`
LatestFailure *time.Time `json:"-"`
LastRunStatus *string `json:"-"`
LastRunError *string `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
}
TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.
func (*TaskUpdate) MarshalJSON ¶
func (t *TaskUpdate) MarshalJSON() ([]byte, error)
func (*TaskUpdate) UnmarshalJSON ¶
func (t *TaskUpdate) UnmarshalJSON(data []byte) error
func (*TaskUpdate) UpdateFlux ¶
func (t *TaskUpdate) UpdateFlux(parser fluxlang.FluxLanguageService, oldFlux string) error
UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it. It zeros the options in the TaskUpdate.
func (*TaskUpdate) Validate ¶
func (t *TaskUpdate) Validate() error