Documentation
¶
Overview ¶
Package tmanager contains methods for controlling the execution of tasks from the storage. Allows you to synchronize workflow execution.
Index ¶
- Variables
- type Handler
- type HandlerCallback
- type HandlerCallbackFunc
- type HandlerFunc
- type Storage
- type Task
- type TaskImpl
- type TaskManager
- func (tm *TaskManager) CreateOneActiveTask(ctx context.Context, startAfter time.Time, handlerName string, ...) error
- func (tm *TaskManager) CreateTask(ctx context.Context, startAfter time.Time, handlerName string, ...) error
- func (tm *TaskManager) CreateTasks(ctx context.Context, tasks ...Task) error
- func (tm *TaskManager) RegisterHandler(handlerName string, h Handler) error
- func (tm *TaskManager) RegisterHandlerCallback(handlerName string, h HandlerCallback) error
- func (tm *TaskManager) RegisterHandlerFunc(handlerName string, h HandlerFunc, cb HandlerCallbackFunc) error
- func (tm *TaskManager) RestartTask(ctx context.Context, task Task, restartAfter time.Time) error
- func (tm *TaskManager) Run(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskNotFound returns when storage can't find to free task. ErrTaskNotFound = errors.New("task not found") // ErrHandlerAlreadyExists returns in handler registration method. ErrHandlerAlreadyExists = errors.New("handler already exists") // ErrHandlerNotFound returns when task manager can't found handler for task. ErrHandlerNotFound = errors.New("handler not found") // ErrHandlerPanic returns when handler called panic. ErrHandlerPanic = errors.New("panic in the handler") )
Functions ¶
This section is empty.
Types ¶
type Handler ¶
type Handler interface {
// HandleTask handler of the task.
HandleTask(ctx context.Context, data json.RawMessage) error
}
Handler task handler interaface.
type HandlerCallback ¶ added in v0.0.3
type HandlerCallback interface {
Handler
// CallbackTask called after saved task state.
CallbackTask(ctx context.Context, task Task, handlerErr error)
}
HandlerCallback task handler with callback.
type HandlerCallbackFunc ¶ added in v0.0.3
HandlerCallbackFunc is a callback called after saved state of the task. Takes context of the task manager and error from HandlerFunc.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, data json.RawMessage) error
HandlerFunc func handler of the task. It takes context and stored data of the task.
type Storage ¶
type Storage interface {
// CreateTasks creates list of the tasks in the storage.
// Task field ID must be empty.
CreateTasks(ctx context.Context, tasks ...Task) error
// CreateOneActiveTask checks that task already exists in storage.
// If the task does not exist in the storage, a new task is created.
CreateOneActiveTask(ctx context.Context, task Task) error
// GetNextTask returns the oldest task that the worker has not yet taken to work.
// Value of the StartAfter must be less then "now" argument.
// Returns error ErrTaskNotFound when storage can't found task.
GetNextTask(ctx context.Context, now time.Time) (Task, error)
// RestartTask makes the task free to re-taking in GetNextTask method.
// You have to set time to re-taking.
RestartTask(ctx context.Context, task Task, restartAfter time.Time) error
// SaveTaskWithSuccess saves success done task.
// If the context is canceled, then ctx will be the default context.Background here
// to be sure to save the task state. See ReplaceCanceledContext method in TaskManager.
SaveTaskWithSuccess(ctx context.Context, task Task) error
// SaveTaskWithError saves task with error.
// If the context is canceled, then ctx will be the default context.Background here
// to be sure to save the task state. See ReplaceCanceledContext method in TaskManager.
SaveTaskWithError(ctx context.Context, task Task, taskErr error) error
}
Storage is a task storage for task manager. It contains CRU operations.
type Task ¶
type Task interface {
GetID() string // Unique identification of the task.
GetStartAfter() time.Time // Time after which the task should be started.
GetHandler() string // Name of the task handler.
GetData() json.RawMessage // Data of the handler.
}
Task is a element of the storage with handler name, launch time and additional data.
type TaskImpl ¶ added in v0.0.2
TaskImpl implements task interface for helper function of task creation.
func (*TaskImpl) GetData ¶ added in v0.0.2
func (t *TaskImpl) GetData() json.RawMessage
GetData returns data of the handler.
func (*TaskImpl) GetHandler ¶ added in v0.0.2
GetHandler returns name of the task handler.
func (*TaskImpl) GetStartAfter ¶ added in v0.0.2
GetStartAfter returns time after which the task should be started.
type TaskManager ¶
type TaskManager struct {
Logger *log.Logger // Writes errors of interaction with the Storage.
Storage Storage
WorkersNum int
GetNextTaskInterval time.Duration
// BeforeHandler calls before each task handler.
// You can use it to add your data to context or to stop handling task.
// Example:
// tm.BeforeHandler = func(ctx context.Context, task *tmanager.Task) (context.Context, error) {
// return context.WithValue(ctx, "task", task), nil
// }
BeforeHandler func(ctx context.Context, task Task) (context.Context, error)
// ReplaceCanceledContext calls before SaveTaskWithSuccess, SaveTaskWithError and CreateTasks
// methods if current context is canceled.
// Raplaces current context with context.Background by default to remove cancel function from context.
// This is necessary to be sure to save the state of tasks after handling them,
// ignoring the cancellation of the context.
// You can disable this feature by setting this method to nil.
// Example:
// tm.ReplaceCanceledContext = func(ctx context.Context) context.Context {
// return context.WithValue(context.Background(), "old_data", ctx.Value("old_data"))
// }
ReplaceCanceledContext func(ctx context.Context) context.Context
// contains filtered or unexported fields
}
TaskManager creates and handle tasks in multithread workers.
func New ¶
func New(errLog io.Writer, s Storage, workersNum int, getNextTaskInterval time.Duration) *TaskManager
New creates new task manager with settings. workersNum is a number of individual workers trying to get and handle tasks. getNextTaskInterval is waiting time to try to get next task, when storage not retuned a task.
func (*TaskManager) CreateOneActiveTask ¶ added in v0.0.3
func (tm *TaskManager) CreateOneActiveTask( ctx context.Context, startAfter time.Time, handlerName string, data interface{}, ) error
CreateOneActiveTask creates a task, checking that there are no other tasks in the store with the same handler. Do not use it for regular tasks.
func (*TaskManager) CreateTask ¶
func (tm *TaskManager) CreateTask( ctx context.Context, startAfter time.Time, handlerName string, data interface{}, ) error
CreateTask converts data to the JSON and creates task in the storage.
func (*TaskManager) CreateTasks ¶
func (tm *TaskManager) CreateTasks(ctx context.Context, tasks ...Task) error
CreateTasks creates one or list of the tasks in the storage.
func (*TaskManager) RegisterHandler ¶
func (tm *TaskManager) RegisterHandler(handlerName string, h Handler) error
RegisterHandler adds a new task handler to the manager.
func (*TaskManager) RegisterHandlerCallback ¶ added in v0.0.3
func (tm *TaskManager) RegisterHandlerCallback(handlerName string, h HandlerCallback) error
RegisterHandlerCallback adds a new task handler with callback to the manager.
func (*TaskManager) RegisterHandlerFunc ¶
func (tm *TaskManager) RegisterHandlerFunc(handlerName string, h HandlerFunc, cb HandlerCallbackFunc) error
RegisterHandlerFunc adds a new task handler function to the manager. Takes name of the handler, handler and optional (may be nil) callback of the task.
func (*TaskManager) RestartTask ¶ added in v0.0.3
RestartTask makes the task free to re-taking.
func (*TaskManager) Run ¶
func (tm *TaskManager) Run(ctx context.Context)
Run runs workers of the manager. You have to cancel context if you want to stop manager and workers. Task Manager will wait for all workers to finish.
Directories
¶
| Path | Synopsis |
|---|---|
|
_example
|
|
|
tasks
command
|
|
|
tmanager_handlers
command
|
|
|
handler
|
|
|
cleaner
Package cleaner contains handler to clean completed task N time ago.
|
Package cleaner contains handler to clean completed task N time ago. |
|
inspector
Package inspector contains handler to inspect stucked tasks in storage.
|
Package inspector contains handler to inspect stucked tasks in storage. |
|
storage
|
|
|
postgresql
Package postgresql implements tmanager.Storage interface using Postgresql database.
|
Package postgresql implements tmanager.Storage interface using Postgresql database. |