Documentation
¶
Overview ¶
Package celeriac is a package for interacting with Celery.
It provides functionality to place tasks on the task queue, as well as monitor task and worker events.
Index ¶
- Constants
- Variables
- func Fail(err error, msg string)
- func Log(err error, msg string)
- type Event
- type PingCmd
- type RateLimitTaskCmd
- type RevokeTaskCmd
- type Task
- type TaskEvent
- type TaskEventsList
- type TaskMonitor
- type TaskQueueMgr
- func (taskQueueMgr *TaskQueueMgr) Close()
- func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
- func (taskQueueMgr *TaskQueueMgr) DispatchTaskWithID(taskID string, taskName string, taskData map[string]interface{}, ...) (*Task, error)
- func (taskQueueMgr *TaskQueueMgr) Ping() error
- func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error
- func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error
- func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error
- type TimeLimitTaskCmd
- type WorkerEvent
Constants ¶
const ( // ConstPublishTaskContentType is the content type of the task data to be published ConstPublishTaskContentType = "application/json" // ConstPublishTaskContentEncoding is the content encoding type of the task data to be published ConstPublishTaskContentEncoding = "utf-8" // ConstTaskDefaultExchangeName is the default exchange name to use when publishing a task ConstTaskDefaultExchangeName = "" // ConstTaskDefaultRoutingKey is the default routing key to use when publishing a task ConstTaskDefaultRoutingKey = "celery" // ConstTaskControlExchangeName is the exchange name for dispatching task control commands ConstTaskControlExchangeName = "celery.pidbox" // ConstEventsMonitorExchangeName is the exchange name used for Celery events ConstEventsMonitorExchangeName = "celeryev" // ConstEventsMonitorExchangeType is the exchange type for the events monitor ConstEventsMonitorExchangeType = "topic" // ConstEventsMonitorQueueName is the queue name of the events monitor ConstEventsMonitorQueueName = "celeriac-events-monitor-queue" // ConstEventsMonitorBindingKey is the binding key for the events monitor ConstEventsMonitorBindingKey = "*.*" // ConstEventsMonitorConsumerTag is the consumer tag name for the events monitor ConstEventsMonitorConsumerTag = "celeriac-events-monitor" // ConstTimeFormat is the general format for all timestamps ConstTimeFormat = "2006-01-02T15:04:05.999999" // ConstEventTypeWorkerOnline is the event type when a Celery worker comes online ConstEventTypeWorkerOnline = "worker-online" // ConstEventTypeWorkerOffline is the event type when a Celery worker goes offline ConstEventTypeWorkerOffline = "worker-offline" // ConstEventTypeWorkerHeartbeat is the event type when a Celery worker is online and "alive" ConstEventTypeWorkerHeartbeat = "worker-heartbeat" // ConstEventTypeTaskSent is the event type when a Celery task is sent ConstEventTypeTaskSent = "task-sent" // ConstEventTypeTaskReceived is the event type when a Celery worker receives a task ConstEventTypeTaskReceived = "task-received" // ConstEventTypeTaskStarted is the event type when a Celery worker starts a task ConstEventTypeTaskStarted = "task-started" // ConstEventTypeTaskSucceeded is the event type when a Celery worker completes a task ConstEventTypeTaskSucceeded = "task-succeeded" // ConstEventTypeTaskFailed is the event type when a Celery worker fails to complete a task ConstEventTypeTaskFailed = "task-failed" // ConstEventTypeTaskRevoked is the event type when a Celery worker has its task revoked ConstEventTypeTaskRevoked = "task-revoked" // ConstEventTypeTaskRetried is the event type when a Celery worker retries a task ConstEventTypeTaskRetried = "task-retried" )
Variables ¶
var ( // ErrInvalidTaskID is raised when an invalid task ID has been detected ErrInvalidTaskID = errors.New("invalid task ID specified") // ErrInvalidTaskName is raised when an invalid task name has been detected ErrInvalidTaskName = errors.New("invalid task name specified") )
Global Errors
Functions ¶
Types ¶
type Event ¶
type Event struct {
// Type is the Celery event type. See supported events listed in "constants.go"
Type string `json:"type"`
// Hostname is the name of the host on which the Celery worker is operating
Hostname string `json:"hostname"`
// Timestamp is the current time of the event
Timestamp float32 `json:"timestamp"`
// PID is the process ID
PID int `json:"pid"`
// Clock is the current clock time
Clock int `json:"clock"`
// UTCOffset is the offset from UTC for the time when this event is valid
UTCOffset int `json:"utcoffset"`
// Data is a property allowing extra data to be sent through for custom events from a Celery worker
Data interface{} `json:"data, omitempty"`
}
Event defines a base event emitted by Celery workers.
func (Event) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Event) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Event) TimestampFormatted ¶
TimestampFormatted returns a formatted string representation of the task event timestamp
func (*Event) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Event) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type PingCmd ¶
type PingCmd struct {
// contains filtered or unexported fields
}
PingCmd is a wrapper to a command
type RateLimitTaskCmd ¶
type RateLimitTaskCmd struct {
Arguments rateLimitTaskArgs `json:"arguments"`
// contains filtered or unexported fields
}
RateLimitTaskCmd is a wrapper to a command
func NewRateLimitTaskCmd ¶
func NewRateLimitTaskCmd(taskName string, rateLimit string) *RateLimitTaskCmd
NewRateLimitTaskCmd creates a new command for rate limiting a task
taskName: Name of task to change rate limit for rateLimit: The rate limit as tasks per second, or a rate limit string (`"100/m"`, etc.
see :attr:`celery.task.base.Task.rate_limit` for more information)
type RevokeTaskCmd ¶
type RevokeTaskCmd struct {
Arguments revokeTaskArgs `json:"arguments"`
// contains filtered or unexported fields
}
RevokeTaskCmd is a wrapper to a command
func NewRevokeTaskCmd ¶
func NewRevokeTaskCmd(taskID string, terminateProcess bool) *RevokeTaskCmd
NewRevokeTaskCmd creates a new command for revoking a task by given id
If a task is revoked, the workers will ignore the task and not execute it after all.
type Task ¶
type Task struct {
// TaskName is the name of the task
TaskName string `json:"task"`
// ID is the task UUID
ID string `json:"id"`
// Args are task arguments (optional)
Args []string `json:"args, omitempty"`
// KWArgs are keyword arguments (optional)
KWArgs map[string]interface{} `json:"kwargs, omitempty"`
// Retries is a number of retries to perform if an error occurs (optional)
Retries int `json:"retries, omitempty"`
// ETA is the estimated completion time (optional)
ETA *time.Time `json:"eta, omitempty"`
// Expires is the time when this task will expire (optional)
Expires *time.Time `json:"expires, omitempty"`
}
Task is a representation of a Celery task
func NewTask ¶
NewTask is a factory function that creates and returns a pointer to a new task object
func NewTaskWithID ¶
func NewTaskWithID(taskID string, taskName string, args []string, kwargs map[string]interface{}) (*Task, error)
NewTaskWithID is a factory function that creates and returns a pointer to a new task object, allowing caller to specify the task ID.
func (*Task) MarshalJSON ¶
MarshalJSON marshals a Task object into a json bytes array
Time properties are converted to UTC and formatted in ISO8601
type TaskEvent ¶
type TaskEvent struct {
Type string `json:"type"`
Hostname string `json:"hostname"`
Timestamp float32 `json:"timestamp"`
PID int `json:"pid"`
Clock int `json:"clock"`
UTCOffset int `json:"utcoffset"`
// UUID is the id of the task
UUID string `json:"uuid"`
// Name is the textual name of the task executed
Name string `json:"name, omitempty"`
// Args is a string of the arguments passed to the task
Args string `json:"args, omitempty"`
// Kwargs is a string of the key-word arguments passed to the task
Kwargs string `json:"kwargs, omitempty"`
// Result is a string containing the result of a completed task
Result string `json:"result, omitempty"`
// Runtime is the execution time
Runtime float32 `json:"runtime, omitempty"`
// Retries is the number of re-tries this task has performed
Retries int `json:"retries, omitempty"`
// ETA is the explicit time and date to run the retry at.
ETA interface{} `json:"eta, omitempty"`
// Expires is the datetime or seconds in the future for the task should expire
Expires interface{} `json:"expires, omitempty"`
// Exception is a string containing error/exception information
Exception string `json:"exception, omitempty"`
// Traceback is a string containing extended error information
Traceback string `json:"traceback, omitempty"`
// Terminated is a flag indicating whether the task has been terminated
Terminated bool `json:"terminated, omitempty"`
// Signum is the signal number
Signum interface{} `json:"signum, omitempty"`
// Expired is a flag indicating whether the task has expired due to factors
Expired bool `json:"expired, omitempty"`
}
TaskEvent is the JSON schema for Celery task events
func NewTaskEvent ¶
func NewTaskEvent() *TaskEvent
NewTaskEvent is a factory function to create a new TaskEvent object
func (TaskEvent) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (TaskEvent) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*TaskEvent) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*TaskEvent) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type TaskMonitor ¶
type TaskMonitor struct {
// Public channel on which events are piped
EventsChannel chan interface{}
// contains filtered or unexported fields
}
TaskMonitor is a Celery task event consumer
func NewTaskMonitor ¶
func NewTaskMonitor(connection *amqp.Connection, channel *amqp.Channel, exchangeName string, exchangeType string, queueName string, bindingKey string, ctag string) (*TaskMonitor, error)
NewTaskMonitor is a factory function that creates a new Celery consumer
func (*TaskMonitor) SetMonitorWorkerHeartbeatEvents ¶
func (monitor *TaskMonitor) SetMonitorWorkerHeartbeatEvents(processHeartbeatEvents bool)
SetMonitorWorkerHeartbeatEvents sets the property whether to process heartbeat events emitted by workers.
NOTE: By default this is set to 'false' so as to minimize unnecessary "noisy heartbeat" events.
func (*TaskMonitor) Shutdown ¶
func (monitor *TaskMonitor) Shutdown() error
Shutdown stops all monitoring, cleaning up any open connections
type TaskQueueMgr ¶
type TaskQueueMgr struct {
Monitor *TaskMonitor
// contains filtered or unexported fields
}
TaskQueueMgr defines a manager for interacting with a Celery task queue
func NewTaskQueueMgr ¶
func NewTaskQueueMgr(brokerURI string) (*TaskQueueMgr, error)
NewTaskQueueMgr is a factory function that creates a new instance of the TaskQueueMgr
func (*TaskQueueMgr) Close ¶
func (taskQueueMgr *TaskQueueMgr) Close()
Close performs appropriate cleanup of any open task queue connections
func (*TaskQueueMgr) DispatchTask ¶
func (taskQueueMgr *TaskQueueMgr) DispatchTask(taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
DispatchTask places a new task on the Celery task queue Creates a new Task based on the supplied task name and data
func (*TaskQueueMgr) DispatchTaskWithID ¶
func (taskQueueMgr *TaskQueueMgr) DispatchTaskWithID(taskID string, taskName string, taskData map[string]interface{}, routingKey string) (*Task, error)
DispatchTaskWithID places a new task with the specified ID on the Celery task queue Creates a new Task based on the supplied task name and data
func (*TaskQueueMgr) Ping ¶
func (taskQueueMgr *TaskQueueMgr) Ping() error
Ping attempts to ping Celery workers
func (*TaskQueueMgr) RateLimitTask ¶
func (taskQueueMgr *TaskQueueMgr) RateLimitTask(taskName string, rateLimit string) error
RateLimitTask attempts to set rate limit tasks by type
func (*TaskQueueMgr) RevokeTask ¶
func (taskQueueMgr *TaskQueueMgr) RevokeTask(taskID string) error
RevokeTask attempts to notify Celery workers that the specified task needs revoking
func (*TaskQueueMgr) TimeLimitTask ¶
func (taskQueueMgr *TaskQueueMgr) TimeLimitTask(taskName string, hardLimit string, softLimit string) error
TimeLimitTask attempts to set time limits for task by type
type TimeLimitTaskCmd ¶
type TimeLimitTaskCmd struct {
Arguments timeLimitTaskArgs `json:"arguments"`
// contains filtered or unexported fields
}
TimeLimitTaskCmd is a wrapper to a command
func NewTimeLimitTaskCmd ¶
func NewTimeLimitTaskCmd(taskName string, hardLimit string, softLimit string) *TimeLimitTaskCmd
NewTimeLimitTaskCmd creates a new command for rate limiting a task
taskName: Name of task to change rate limit for hardLimit: New hard time limit (in seconds) softLimit: New soft time limit (in seconds)
type WorkerEvent ¶
type WorkerEvent struct {
Type string `json:"type"`
Hostname string `json:"hostname"`
Timestamp float32 `json:"timestamp"`
PID int `json:"pid"`
Clock int `json:"clock"`
UTCOffset int `json:"utcoffset"`
// SWSystem is the software system being used
SWSystem string `json:"sw_sys"`
// SWVersion is the software version being used
SWVersion string `json:"sw_ver"`
// LoadAverage is an array of average CPU loadings for the worker
LoadAverage []float32 `json:"loadavg"`
// Freq is the worker frequency use
Freq float32 `json:"freq"`
// SWIdentity is the software identity
SWIdentity string `json:"sw_ident"`
// Processed is the number of items processed
Processed int `json:"processed, omitempt"`
// Active is the active number of workers
Active int `json:"active, omitempty"`
}
WorkerEvent defines an event emitted by workers, specific to its operation. Event "types" emitted are:
- "worker-online"
- "worker-offline"
- "worker-heartbeat"
Example worker event json:
{
"sw_sys": "Darwin",
"clock": 74,
"timestamp": 1843965659.580637,
"hostname": "celery@worker1.My-Mac.local",
"pid": 10837,
"sw_ver": "3.1.18",
"utcoffset": -11,
"loadavg": [2.0, 2.41, 2.54],
"processed": 6,
"active": 0,
"freq": 2.0,
"type": "worker-offline",
"sw_ident": "py-celery"
}
func NewWorkerEvent ¶
func NewWorkerEvent() *WorkerEvent
NewWorkerEvent is a factory function to create a new WorkerEvent object
func (WorkerEvent) MarshalEasyJSON ¶
func (v WorkerEvent) MarshalEasyJSON(w *jwriter.Writer)
MarshalEasyJSON supports easyjson.Marshaler interface
func (WorkerEvent) MarshalJSON ¶
func (v WorkerEvent) MarshalJSON() ([]byte, error)
MarshalJSON supports json.Marshaler interface
func (*WorkerEvent) UnmarshalEasyJSON ¶
func (v *WorkerEvent) UnmarshalEasyJSON(l *jlexer.Lexer)
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*WorkerEvent) UnmarshalJSON ¶
func (v *WorkerEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON supports json.Unmarshaler interface