tmanager

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2021 License: MIT Imports: 9 Imported by: 0

README

TManager

Golang Task Manager with Postgresql database and concurrency controlling.

Features

  • Background thread safe task scanner
  • Database stdlib compatible
  • Task monitoring and task cleaner handlers
  • Middlewares and context using

Example

package main

import (
	"context"
	"encoding/json"
	"errors"
	"log"
	"time"

	"github.com/jackc/pgx/v4"
	"github.com/jackc/pgx/v4/stdlib"

	"gitlab.com/so_literate/tmanager"
	"gitlab.com/so_literate/tmanager/storage/postgresql"
)

var (
	dsn = "host=127.0.0.1 user=postgres password=postgres dbname=postgres port=5432 sslmode=disable"

	selfRepeatedTask = "self_repeated_task"
	onTimeTask       = "one_time_task"
)

type taskHandler struct {
	tm *tmanager.TaskManager
}

func (t *taskHandler) HandleTask(ctx context.Context, data json.RawMessage) error {
	log.Println("called a task handler")
	return nil
}

func (t *taskHandler) CallbackTask(ctx context.Context, task tmanager.Task, handlerErr error) {
	nextTime := time.Now().Add(time.Second * 5)

	if handlerErr != nil {
		log.Printf("error in handler: %s\n", handlerErr)
		err := t.tm.RestartTask(ctx, task, nextTime)
		if err != nil {
			log.Printf("failed to restart task: %s\n", err)
		}

		return
	}

	err := t.tm.CreateTask(ctx, nextTime, task.GetHandler(), nil)
	if err != nil {
		log.Printf("failed to created task: %s\n", err)
	}
}

func main() {
	conf, err := pgx.ParseConfig(dsn)
	if err != nil {
		log.Fatalf("pgx.ParseConfig: %s", err)
	}

	db := stdlib.OpenDB(*conf)

	storage, err := postgresql.New(db, nil)
	if err != nil {
		log.Fatalf("postgresql.New: %s", err)
	}

	tm := tmanager.New(log.Writer(), storage, 0, time.Second*10)
	ctx := context.Background()

	err = tm.RegisterHandlerCallback(selfRepeatedTask, &taskHandler{tm: tm})
	if err != nil {
		log.Fatalf("tm.RegisterHandlerCallback: %s\n", err)
	}

	err = tm.RegisterHandlerFunc(
		onTimeTask,
		func(ctx context.Context, data json.RawMessage) error {
			log.Println("called one time handler")
			return errors.New("onTimeTask error")
		},
		nil, // callback
	)
	if err != nil {
		log.Fatalf("tm.RegisterHandlerFunc: %s\n", err)
	}

	// also you can use tm.CreateOneActiveTask() to create only one active task in storage.
	err = tm.CreateTask(ctx, time.Now(), selfRepeatedTask, nil)
	if err != nil {
		log.Fatalf("tm.CreateTask selfRepeatedTask: %s\n", err)
	}

	err = tm.CreateTask(ctx, time.Now().Add(time.Second), onTimeTask, nil)
	if err != nil {
		log.Fatalf("tm.CreateTask onTimeTask: %s\n", err)
	}

	log.Println("run task manager")

	tm.Run(ctx) // cancel context to graceful stop task manager
}

Documentation

Overview

Package tmanager contains methods for controlling the execution of tasks from the storage. Allows you to synchronize workflow execution.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskNotFound returns when storage can't find to free task.
	ErrTaskNotFound = errors.New("task not found")
	// ErrHandlerAlreadyExists returns in handler registration method.
	ErrHandlerAlreadyExists = errors.New("handler already exists")
	// ErrHandlerNotFound returns when task manager can't found handler for task.
	ErrHandlerNotFound = errors.New("handler not found")
	// ErrHandlerPanic returns when handler called panic.
	ErrHandlerPanic = errors.New("panic in the handler")
)

Functions

This section is empty.

Types

type Handler

type Handler interface {
	// HandleTask handler of the task.
	HandleTask(ctx context.Context, data json.RawMessage) error
}

Handler task handler interaface.

type HandlerCallback added in v0.0.3

type HandlerCallback interface {
	Handler
	// CallbackTask called after saved task state.
	CallbackTask(ctx context.Context, task Task, handlerErr error)
}

HandlerCallback task handler with callback.

type HandlerCallbackFunc added in v0.0.3

type HandlerCallbackFunc func(ctx context.Context, task Task, handlerErr error)

HandlerCallbackFunc is a callback called after saved state of the task. Takes context of the task manager and error from HandlerFunc.

type HandlerFunc

type HandlerFunc func(ctx context.Context, data json.RawMessage) error

HandlerFunc func handler of the task. It takes context and stored data of the task.

type Storage

type Storage interface {
	// CreateTasks creates list of the tasks in the storage.
	// Task field ID must be empty.
	CreateTasks(ctx context.Context, tasks ...Task) error

	// CreateOneActiveTask checks that task already exists in storage.
	// If the task does not exist in the storage, a new task is created.
	CreateOneActiveTask(ctx context.Context, task Task) error

	// GetNextTask returns the oldest task that the worker has not yet taken to work.
	// Value of the StartAfter must be less then "now" argument.
	// Returns error ErrTaskNotFound when storage can't found task.
	GetNextTask(ctx context.Context, now time.Time) (Task, error)

	// RestartTask makes the task free to re-taking in GetNextTask method.
	// You have to set time to re-taking.
	RestartTask(ctx context.Context, task Task, restartAfter time.Time) error

	// SaveTaskWithSuccess saves success done task.
	// If the context is canceled, then ctx will be the default context.Background here
	// to be sure to save the task state. See ReplaceCanceledContext method in TaskManager.
	SaveTaskWithSuccess(ctx context.Context, task Task) error
	// SaveTaskWithError saves task with error.
	// If the context is canceled, then ctx will be the default context.Background here
	// to be sure to save the task state. See ReplaceCanceledContext method in TaskManager.
	SaveTaskWithError(ctx context.Context, task Task, taskErr error) error
}

Storage is a task storage for task manager. It contains CRU operations.

type Task

type Task interface {
	GetID() string            // Unique identification of the task.
	GetStartAfter() time.Time // Time after which the task should be started.
	GetHandler() string       // Name of the task handler.
	GetData() json.RawMessage // Data of the handler.
}

Task is a element of the storage with handler name, launch time and additional data.

type TaskImpl added in v0.0.2

type TaskImpl struct {
	ID         string
	StartAfter time.Time
	Handler    string
	Data       json.RawMessage
}

TaskImpl implements task interface for helper function of task creation.

func (*TaskImpl) GetData added in v0.0.2

func (t *TaskImpl) GetData() json.RawMessage

GetData returns data of the handler.

func (*TaskImpl) GetHandler added in v0.0.2

func (t *TaskImpl) GetHandler() string

GetHandler returns name of the task handler.

func (*TaskImpl) GetID added in v0.0.2

func (t *TaskImpl) GetID() string

GetID returns unique identification of the task.

func (*TaskImpl) GetStartAfter added in v0.0.2

func (t *TaskImpl) GetStartAfter() time.Time

GetStartAfter returns time after which the task should be started.

type TaskManager

type TaskManager struct {
	Logger  *log.Logger // Writes errors of interaction with the Storage.
	Storage Storage

	WorkersNum          int
	GetNextTaskInterval time.Duration

	// BeforeHandler calls before each task handler.
	// You can use it to add your data to context or to stop handling task.
	// Example:
	//    tm.BeforeHandler = func(ctx context.Context, task *tmanager.Task) (context.Context, error) {
	//    return context.WithValue(ctx, "task", task), nil
	//  }
	BeforeHandler func(ctx context.Context, task Task) (context.Context, error)

	// ReplaceCanceledContext calls before SaveTaskWithSuccess, SaveTaskWithError and CreateTasks
	// methods if current context is canceled.
	// Raplaces current context with context.Background by default to remove cancel function from context.
	// This is necessary to be sure to save the state of tasks after handling them,
	// ignoring the cancellation of the context.
	// You can disable this feature by setting this method to nil.
	// Example:
	//  tm.ReplaceCanceledContext = func(ctx context.Context) context.Context {
	//    return context.WithValue(context.Background(), "old_data", ctx.Value("old_data"))
	//  }
	ReplaceCanceledContext func(ctx context.Context) context.Context
	// contains filtered or unexported fields
}

TaskManager creates and handle tasks in multithread workers.

func New

func New(errLog io.Writer, s Storage, workersNum int, getNextTaskInterval time.Duration) *TaskManager

New creates new task manager with settings. workersNum is a number of individual workers trying to get and handle tasks. getNextTaskInterval is waiting time to try to get next task, when storage not retuned a task.

func (*TaskManager) CreateOneActiveTask added in v0.0.3

func (tm *TaskManager) CreateOneActiveTask(
	ctx context.Context,
	startAfter time.Time,
	handlerName string,
	data interface{},
) error

CreateOneActiveTask creates a task, checking that there are no other tasks in the store with the same handler. Do not use it for regular tasks.

func (*TaskManager) CreateTask

func (tm *TaskManager) CreateTask(
	ctx context.Context,
	startAfter time.Time,
	handlerName string,
	data interface{},
) error

CreateTask converts data to the JSON and creates task in the storage.

func (*TaskManager) CreateTasks

func (tm *TaskManager) CreateTasks(ctx context.Context, tasks ...Task) error

CreateTasks creates one or list of the tasks in the storage.

func (*TaskManager) RegisterHandler

func (tm *TaskManager) RegisterHandler(handlerName string, h Handler) error

RegisterHandler adds a new task handler to the manager.

func (*TaskManager) RegisterHandlerCallback added in v0.0.3

func (tm *TaskManager) RegisterHandlerCallback(handlerName string, h HandlerCallback) error

RegisterHandlerCallback adds a new task handler with callback to the manager.

func (*TaskManager) RegisterHandlerFunc

func (tm *TaskManager) RegisterHandlerFunc(handlerName string, h HandlerFunc, cb HandlerCallbackFunc) error

RegisterHandlerFunc adds a new task handler function to the manager. Takes name of the handler, handler and optional (may be nil) callback of the task.

func (*TaskManager) RestartTask added in v0.0.3

func (tm *TaskManager) RestartTask(ctx context.Context, task Task, restartAfter time.Time) error

RestartTask makes the task free to re-taking.

func (*TaskManager) Run

func (tm *TaskManager) Run(ctx context.Context)

Run runs workers of the manager. You have to cancel context if you want to stop manager and workers. Task Manager will wait for all workers to finish.

Directories

Path Synopsis
_example
tasks command
handler
cleaner
Package cleaner contains handler to clean completed task N time ago.
Package cleaner contains handler to clean completed task N time ago.
inspector
Package inspector contains handler to inspect stucked tasks in storage.
Package inspector contains handler to inspect stucked tasks in storage.
storage
postgresql
Package postgresql implements tmanager.Storage interface using Postgresql database.
Package postgresql implements tmanager.Storage interface using Postgresql database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL