stepper

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2022 License: MIT Imports: 10 Imported by: 3

README

Stepper

A simple, efficient, concurrent task runner.

  • Simple. Run tasks and schedule jobs with GO.
  • Database agnostic. Stepper supports MongoDB, Postgresql (coming soon).
  • Concurrent. Stepper can be used in an unlimited number of instances.
  • Scalable. Split one task into small subtasks which will run on different nodes.

Install

go get github.com/matroskin13/stepper

Getting started

package main

import (
    "log"

    "github.com/matroskin13/stepper"
    "github.com/matroskin13/stepper/engines/mongo"
)

func main() {
    mongoEngine, err := mongo.NewMongo("mongodb://localhost:27017", "example_database")
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    service := stepper.NewService(mongoEngine)

    // Will publish a task on startup
    if err := service.Publish(ctx, "example-task", []byte("Hello world")); err != nil {
        log.Fatal(err)
    }

    s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
        fmt.Println(string(data))

        return nil
    })

    if err := s.Listen(ctx); err != nil {
        log.Fatal(err)
    }
}

Table of Contents

Publish task

If you use the stepper you will use a lot of things but first of all you will publish and execute tasks. Let's discuss how you can publish tasks.

Simple way
service.Publish(context.Background(), "example-task", []byte("hello"))

The example shows the simple way to publish a task. The code will publish a task with a name example-task and content hello.

But also the stepper allows you to use additional options.

Publish with delay

If you don't want to execute a task immediately you can set up a delay.

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.SetDelay(time.Minute * 1),
)

Or you can use particular a date

service.Publish(
    context.Background(),
    "example-task",
    []byte("hello"),
    stepper.LaunchAt(time.Now().Add(time.Minute * 10)),
)

Execute a task

The second part of the Stepper is execution of tasks in queue.

Simple way
s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    fmt.Println(string(data))

    return nil
})

The example shows the simple way to execute a task.

Error handling

If your handler returns an error, a task will be returned to the queue. And the task will be held in the queue for 10 seconds. But you can set up a delay manually.

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    ctx.SetRetryAfter(time.Minute) // will be returned in 1 minute
    return fmt.Errorf("some error")
})
Bind a state

If you have a log running task, you can bind a state of task (cursor for example), and if your task failed you will be able to continue the task with the last state

s.TaskHandler("example-task", func(ctx stepper.Context, data []byte) error {
    var lastId int

    if err := ctx.BindState(&lastId); err != nil {
        return err
    }

    iter := getSomethingFromId(lastId) // something like a mongodb iterator or anything else

    for iter.Next() {
        lastId = ... // do something

        if err := ctx.SetState(lastId); err != nil {
            return err
        }
    }

    return nil
})

Subtasks

The most powerful feature of the stepper is creating subtasks. The feature allows you to split a long-running task into separate tasks which will run on different nodes. And when all subtasks will be completed the stepper will call a onFinish hook of parent task.

Create a subtask

The following example shows how to spawn subtasks within a main task.

s.TaskHandler("task-with-threads", func(ctx stepper.Context, data []byte) error {
    fmt.Println("have received the word for splitting: ", string(data))

    for _, symbol := range strings.Split(string(data), "") {
        ctx.CreateSubtask(stepper.CreateTask{
            Data: []byte(symbol),
        })
    }

    return nil
}).Subtask(func(ctx stepper.Context, data []byte) error {
    fmt.Printf("[letter-subtask]: have received symbol: %s\r\n", data)
    return nil
}).OnFinish(func(ctx stepper.Context, data []byte) error {
    fmt.Println("subtasks are over")
    return nil
})

Or you can use existing a subtask:

ctx.CreateSubtask(stepper.CreateTask{
    Name: "some-task",
    Data: []byte(symbol),
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Apply

func Apply[T any](initial *T, callbacks []func(*T)) *T

func Or

func Or[T comparable](first, second T) T

func Pool

func Pool[T any](ctx context.Context, count int, consumer func(*T)) chan *T

Types

type Context

type Context interface {
	Task() *Task
	Context() context.Context
	CreateSubtask(sub CreateTask)
	BindState(state any) error
	SetState(state any) error
	SetRetryAfter(timeout time.Duration)
}

type CreateTask

type CreateTask struct {
	Name        string
	Data        []byte
	CustomId    string
	LaunchAfter time.Duration
	LaunchAt    time.Time
}

type Engine

type Engine interface {
	TaskEngine
	JobEngine
}

type Handler

type Handler func(ctx Context, data []byte) error

type HandlerStruct

type HandlerStruct interface {
	OnFinish(h Handler) HandlerStruct
	Subtask(handler Handler) HandlerStruct
	UseMiddleware(middlewares ...MiddlewareHandler)
	DependOnCustomId() HandlerStruct
}

type Job

type Job struct {
	Status       string    `json:"status"`
	Name         string    `json:"name"`
	Pattern      string    `json:"pattern"`
	NextLaunchAt time.Time `json:"naxtLaunchAt"`
}

func (*Job) CalculateNextLaunch

func (j *Job) CalculateNextLaunch() error

type JobConfig

type JobConfig struct {
	Tags    []string
	Name    string
	Pattern string
}

func (*JobConfig) NextLaunch

func (c *JobConfig) NextLaunch() (time.Time, error)

type JobEngine

type JobEngine interface {
	FindNextJob(ctx context.Context, statuses []string) (*Job, error)
	GetUnreleasedJobChildren(ctx context.Context, jobName string) (*Task, error)
	Release(ctx context.Context, jobName string, nextLaunchAt time.Time) error
	WaitForSubtasks(ctx context.Context, jobName string) error
	RegisterJob(ctx context.Context, cfg *JobConfig) error
}

type JobHandler

type JobHandler func(ctx Context) error

type MiddlewareFunc

type MiddlewareFunc func(ctx Context, t *Task) error

type MiddlewareHandler

type MiddlewareHandler func(t MiddlewareFunc) MiddlewareFunc

type PublishOption

type PublishOption func(c *CreateTask)

func LaunchAt

func LaunchAt(t time.Time) PublishOption

func SetDelay

func SetDelay(d time.Duration) PublishOption

type Service

type Service struct {
	// contains filtered or unexported fields
}

func (*Service) Listen

func (s *Service) Listen(ctx context.Context) error

func (*Service) ListenJobs

func (s *Service) ListenJobs(ctx context.Context) error

func (*Service) ListenTasks

func (s *Service) ListenTasks(ctx context.Context) error

func (*Service) ListenWaitingJobs

func (s *Service) ListenWaitingJobs(ctx context.Context) error

func (*Service) ListenWaitingTasks

func (s *Service) ListenWaitingTasks(ctx context.Context) error

func (*Service) Publish

func (s *Service) Publish(ctx context.Context, name string, data []byte, options ...PublishOption) error

func (*Service) RegisterJob

func (s *Service) RegisterJob(ctx context.Context, config *JobConfig, h JobHandler) HandlerStruct

func (*Service) TaskHandler

func (s *Service) TaskHandler(name string, h Handler) HandlerStruct

func (*Service) UseMiddleware

func (s *Service) UseMiddleware(h MiddlewareHandler)

type Stepper

type Stepper interface {
	TaskHandler(name string, handler Handler) HandlerStruct
	Listen(ctx context.Context) error
	Publish(ctx context.Context, name string, data []byte, options ...PublishOption) error
	RegisterJob(ctx context.Context, config *JobConfig, h JobHandler) HandlerStruct
	UseMiddleware(h MiddlewareHandler)
}

func NewService

func NewService(engine Engine) Stepper

type Task

type Task struct {
	ID               string            `json:"_id"`
	CustomId         string            `bson:"custom_id"`
	Name             string            `json:"name"`
	Data             []byte            `json:"data"`
	JobId            string            `json:"jobId"`
	Parent           string            `json:"parent"`
	LaunchAt         time.Time         `json:"launchAt"`
	Status           string            `json:"status"`
	LockAt           *time.Time        `json:"lock_at"`
	State            []byte            `json:"state"`
	MiddlewaresState map[string][]byte `json:"middlewares_state"`
}

func (*Task) IsWaiting

func (t *Task) IsWaiting() bool

type TaskEngine

type TaskEngine interface {
	GetRelatedTask(ctx context.Context, task string, id string) (*Task, error)
	FindNextTask(ctx context.Context, statuses []string) (*Task, error)
	ReleaseTask(ctx context.Context, id string) error
	WaitTaskForSubtasks(ctx context.Context, id string) error
	FailTask(ctx context.Context, task *Task, err error, timeout time.Duration) error
	CreateTask(ctx context.Context, task *Task) error
	GetUnreleasedTaskChildren(ctx context.Context, id string) (*Task, error)
	SetState(ctx context.Context, id string, state []byte) error
}

Directories

Path Synopsis
engines
cron command
middleware command
simple command
subtasks command
thread command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL