manager

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewExec

func NewExec(tracer trace.Tracer) *exec

Types

type Manager

type Manager struct {
	pg.PoolConn
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, pool pg.PoolConn, name, version string, opts ...Opt) (*Manager, error)

func (*Manager) CleanQueue

func (manager *Manager) CleanQueue(ctx context.Context, name string) (result []schema.Task, err error)

CleanQueue removes stale tasks from a queue, and returns the tasks removed.

func (*Manager) CreateNextPartition

func (manager *Manager) CreateNextPartition(ctx context.Context) (result string, err error)

func (*Manager) CreatePartition

func (manager *Manager) CreatePartition(ctx context.Context, meta schema.PartitionMeta) (err error)

func (*Manager) CreateTask

func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (result *schema.Task, err error)

CreateTask creates a new task in a queue, and returns it.

func (*Manager) DeletePartition

func (manager *Manager) DeletePartition(ctx context.Context, name string) (err error)

func (*Manager) DeleteQueue

func (manager *Manager) DeleteQueue(ctx context.Context, name string) (result *schema.Queue, err error)

DeleteQueue deletes an existing queue, and returns it.

func (*Manager) DeleteTicker

func (manager *Manager) DeleteTicker(ctx context.Context, name string) (result *schema.Ticker, err error)

DeleteTicker deletes an existing ticker, and returns the deleted ticker.

func (*Manager) DropDrainedPartition

func (manager *Manager) DropDrainedPartition(ctx context.Context) (result string, err error)

func (*Manager) GetPartitionSeq

func (manager *Manager) GetPartitionSeq(ctx context.Context) (result uint64, err error)

func (*Manager) GetQueue

func (manager *Manager) GetQueue(ctx context.Context, name string) (result *schema.Queue, err error)

GetQueue returns a queue by name.

func (*Manager) GetTicker

func (manager *Manager) GetTicker(ctx context.Context, name string) (result *schema.Ticker, err error)

GetTicker returns a ticker by name.

func (*Manager) ListPartitions

func (manager *Manager) ListPartitions(ctx context.Context) (result []schema.Partition, err error)

func (*Manager) ListQueueStatuses

func (manager *Manager) ListQueueStatuses(ctx context.Context) (result []schema.QueueStatus, err error)

ListQueueStatuses returns the status breakdown for all queues.

func (*Manager) ListQueues

func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (result *schema.QueueList, err error)

ListQueues returns all queues as a list.

func (*Manager) ListTasks

func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (result *schema.TaskList, err error)

ListTasks returns all tasks with optional queue and status filtering.

func (*Manager) ListTickers

func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (result *schema.TickerList, err error)

ListTickers returns all tickers as a list.

func (*Manager) NextTask

func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (result *schema.Task, err error)

NextTask retains a task from any of the specified queues, and returns it. If no task is available, nil is returned.

func (*Manager) NextTicker

func (manager *Manager) NextTicker(ctx context.Context) (result *schema.Ticker, err error)

NextTicker returns the next matured ticker, or nil.

func (*Manager) RegisterQueue

func (manager *Manager) RegisterQueue(ctx context.Context, name string, meta schema.QueueMeta, callback schema.TaskFunc) (result *schema.Queue, err error)

RegisterQueue creates a new queue, or updates an existing queue, and returns it.

func (*Manager) RegisterTicker

func (manager *Manager) RegisterTicker(ctx context.Context, name string, meta schema.TickerMeta, callback schema.TaskFunc) (resultPtr *schema.Ticker, err error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) ReleaseTask

func (manager *Manager) ReleaseTask(ctx context.Context, taskId uint64, success bool, result json.RawMessage, status *string) (resp *schema.Task, err error)

ReleaseTask releases a task, optionally marking it as failed, and returns it.

func (*Manager) Run

func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error

func (*Manager) UpdateQueue

func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (result *schema.Queue, err error)

UpdateQueue updates an existing queue, and returns it.

func (*Manager) UpdateTicker

func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (result *schema.Ticker, err error)

UpdateTicker updates an existing ticker, and returns it.

type Opt

type Opt func(*opt) error

Opt configures a Manager during construction.

func WithMeter

func WithMeter(meter metric.Meter) Opt

WithMeter sets the OpenTelemetry meter used for manager metrics.

func WithSchema

func WithSchema(schemaName string) Opt

WithSchema sets the database schema names to use for all queries. If not set the default schemas are used.

func WithTracer

func WithTracer(tracer trace.Tracer) Opt

WithTracer sets the OpenTelemetry tracer used for manager spans.

func WithWorker

func WithWorker(workerName string) Opt

WithWorker sets the worker name used for manager tasks. If not set the hostname is used.

type Result

type Result struct {
	Queue  string            `json:"queue,omitempty"`
	Task   *schema.Task      `json:"task,omitempty"`
	Ticker string            `json:"ticker,omitempty"`
	Result json.RawMessage   `json:"result,omitempty"`
	Error  error             `json:"error,omitempty"`
	Trace  trace.SpanContext `json:"-"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL