barngo

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: MIT Imports: 13 Imported by: 0

README

Task pool and scheduler using database

This is a simple scheduler with a database store (it was tested only with PostgreSQL)

Using the task pool

Using the scheduler

package main

import (
	"context"
	"database/sql"
	"os"
	"os/signal"

	"github.com/bibenga/barn-go/scheduler"
)

func main() {
	db, err := sql.Open("pgx", "host=rds port=5432 user=rds password=sqlsql dbname=rds TimeZone=UTC sslmode=disable")
	if err != nil {
		panic(err)
	}
	defer db.Close()

	repository := scheduler.NewDefaultPostgresSchedulerRepository()

	ctx, cancel := context.WithCancel(context.Background())

	sched := scheduler.NewScheduler(db, &scheduler.SchedulerConfig{Repository: repository})
	sched.StartContext(ctx)

	osSignal := make(chan os.Signal, 1)
	signal.Notify(osSignal, os.Interrupt)
	s := <-osSignal
	
    cancel()
    sched.Stop()
}

Documentation

Index

Constants

View Source
const IgnoreResult = "IgnoreResult"

Variables

This section is empty.

Functions

func CamelToSnake added in v0.5.0

func CamelToSnake(name string) string

func RunInTransaction

func RunInTransaction(db *sql.DB, f func(tx *sql.Tx) error) error

func SetFieldValue added in v0.5.0

func SetFieldValue(field reflect.Value, value any)

Types

type FieldMeta added in v0.5.0

type FieldMeta struct {
	Name       string // a barn's name
	AttrName   string // a field name in a user structure
	ColumnName string // a name in a DB
}

type Schedule added in v0.5.0

type Schedule struct {
	Id        int            `barn:""`
	Name      string         `barn:""`
	IsActive  bool           `barn:""`
	Cron      *string        `barn:""`
	NextRunAt *time.Time     `barn:""`
	LastRunAt *time.Time     `barn:""`
	Func      string         `barn:""`
	Args      map[string]any `barn:""`
}

func (Schedule) LogValue added in v0.5.0

func (e Schedule) LogValue() slog.Value

func (Schedule) TableName added in v0.5.0

func (e Schedule) TableName() string

type Scheduler added in v0.5.0

type Scheduler[S any] struct {
	// contains filtered or unexported fields
}

func NewScheduler added in v0.5.0

func NewScheduler[S any](db *sql.DB, config ...SchedulerConfig[S]) *Scheduler[S]

func (*Scheduler[S]) Create added in v0.5.0

func (w *Scheduler[S]) Create(tx *sql.Tx, schedule *S) error

func (*Scheduler[S]) Delete added in v0.5.0

func (w *Scheduler[S]) Delete(tx *sql.Tx, pk any) error

func (*Scheduler[S]) DeleteAll added in v0.5.0

func (w *Scheduler[S]) DeleteAll(tx *sql.Tx) error

func (*Scheduler[S]) FindAllActive added in v0.5.0

func (w *Scheduler[S]) FindAllActive(tx *sql.Tx, moment time.Time) ([]*S, error)

func (*Scheduler[S]) Save added in v0.5.0

func (w *Scheduler[S]) Save(tx *sql.Tx, schedule *S) error

func (*Scheduler[S]) Start added in v0.5.0

func (w *Scheduler[S]) Start()

func (*Scheduler[S]) StartContext added in v0.5.0

func (w *Scheduler[S]) StartContext(ctx context.Context)

func (*Scheduler[S]) Stop added in v0.5.0

func (w *Scheduler[S]) Stop()

type SchedulerConfig added in v0.5.0

type SchedulerConfig[S any] struct {
	Log     *slog.Logger
	Cron    string
	Handler SchedulerHandler[S]
}

type SchedulerHandler added in v0.5.0

type SchedulerHandler[S any] func(tx *sql.Tx, schedule *S) error

func DefaultSchedulerHandler added in v0.5.0

func DefaultSchedulerHandler(worker *Worker[Task]) SchedulerHandler[Schedule]

type Status added in v0.5.0

type Status string
const (
	Queued Status = "Q"
	Done   Status = "D"
	Failed Status = "F"
)

type TableMeta added in v0.5.0

type TableMeta struct {
	TableName    string
	Fields       []*FieldMeta
	FieldsByName map[string]*FieldMeta
}

func GetTableMeta added in v0.5.0

func GetTableMeta(t interface{}) TableMeta

type Tabler added in v0.5.0

type Tabler interface {
	TableName() string
}

type Task added in v0.5.0

type Task struct {
	Id         int            `barn:""`
	RunAt      time.Time      `barn:""`
	Func       string         `barn:""`
	Args       map[string]any `barn:""`
	Status     Status         `barn:""`
	StartedAt  *time.Time     `barn:""`
	FinishedAt *time.Time     `barn:""`
	Result     any            `barn:""`
	Error      *string        `barn:""`
}

func (Task) LogValue added in v0.5.0

func (e Task) LogValue() slog.Value

func (Task) TableName added in v0.5.0

func (e Task) TableName() string

type TaskFunc added in v0.5.0

type TaskFunc func(tx *sql.Tx, args any) (any, error)

type TaskHandler added in v0.5.0

type TaskHandler[T any] func(tx *sql.Tx, task *T) (any, error)

func DefaultRegistryTaskHandler added in v0.5.0

func DefaultRegistryTaskHandler(registry *TaskRegistry) TaskHandler[Task]

type TaskRegistry added in v0.5.0

type TaskRegistry struct {
	// contains filtered or unexported fields
}

func NewTaskRegistry added in v0.5.0

func NewTaskRegistry() *TaskRegistry

func (*TaskRegistry) ApplyAsync added in v0.5.0

func (r *TaskRegistry) ApplyAsync(tx *sql.Tx, name string, args any, countdown *int, eta *time.Time) error

func (*TaskRegistry) Call added in v0.5.0

func (r *TaskRegistry) Call(tx *sql.Tx, name string, args any) (any, error)

func (*TaskRegistry) Delay added in v0.5.0

func (r *TaskRegistry) Delay(tx *sql.Tx, name string, args any, countdown *int, eta *time.Time) error

func (*TaskRegistry) Register added in v0.5.0

func (r *TaskRegistry) Register(name string, f TaskFunc)

func (*TaskRegistry) Unregister added in v0.5.0

func (r *TaskRegistry) Unregister(name string)

type Worker added in v0.5.0

type Worker[T any] struct {
	// contains filtered or unexported fields
}

func NewWorker added in v0.5.0

func NewWorker[T any](db *sql.DB, config ...WorkerConfig[T]) *Worker[T]

func (*Worker[T]) Create added in v0.5.0

func (w *Worker[T]) Create(tx *sql.Tx, t *T) error

func (*Worker[T]) DeleteAll added in v0.5.0

func (w *Worker[T]) DeleteAll(tx *sql.Tx) error

func (*Worker[T]) DeleteOld added in v0.5.0

func (w *Worker[T]) DeleteOld(tx *sql.Tx, moment time.Time) (int, error)

func (*Worker[T]) FindNext added in v0.5.0

func (w *Worker[T]) FindNext(tx *sql.Tx) (*T, error)

func (*Worker[T]) Save added in v0.5.0

func (w *Worker[T]) Save(tx *sql.Tx, t *T) error

func (*Worker[T]) Start added in v0.5.0

func (w *Worker[T]) Start()

func (*Worker[T]) StartContext added in v0.5.0

func (w *Worker[T]) StartContext(ctx context.Context)

func (*Worker[T]) Stop added in v0.5.0

func (w *Worker[T]) Stop()

type WorkerConfig added in v0.5.0

type WorkerConfig[T any] struct {
	Log     *slog.Logger
	Cron    string
	Handler TaskHandler[T]
}

Directories

Path Synopsis
complex command
perf command
scheduler command
task command
task_model command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL