manager

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewExec

func NewExec(tracer trace.Tracer) *exec

Types

type Manager

type Manager struct {
	pg.PoolConn
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, pool pg.PoolConn, opts ...Opt) (*Manager, error)

func (*Manager) CleanQueue

func (manager *Manager) CleanQueue(ctx context.Context, name string) (result []schema.Task, err error)

CleanQueue removes stale tasks from a queue, and returns the tasks removed.

func (*Manager) CreateNextPartition

func (manager *Manager) CreateNextPartition(ctx context.Context) (result string, err error)

func (*Manager) CreatePartition

func (manager *Manager) CreatePartition(ctx context.Context, meta schema.PartitionMeta) (err error)

func (*Manager) CreateTask

func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (result *schema.Task, err error)

CreateTask creates a new task in a queue, and returns it.

func (*Manager) DeletePartition

func (manager *Manager) DeletePartition(ctx context.Context, name string) (err error)

func (*Manager) DeleteQueue

func (manager *Manager) DeleteQueue(ctx context.Context, name string) (result *schema.Queue, err error)

DeleteQueue deletes an existing queue, and returns it.

func (*Manager) DeleteTicker

func (manager *Manager) DeleteTicker(ctx context.Context, name string) (result *schema.Ticker, err error)

DeleteTicker deletes an existing ticker, and returns the deleted ticker.

func (*Manager) DropDrainedPartition

func (manager *Manager) DropDrainedPartition(ctx context.Context) (result string, err error)

func (*Manager) GetPartitionSeq

func (manager *Manager) GetPartitionSeq(ctx context.Context) (result uint64, err error)

func (*Manager) GetQueue

func (manager *Manager) GetQueue(ctx context.Context, name string) (result *schema.Queue, err error)

GetQueue returns a queue by name.

func (*Manager) GetTicker

func (manager *Manager) GetTicker(ctx context.Context, name string) (result *schema.Ticker, err error)

GetTicker returns a ticker by name.

func (*Manager) ListPartitions

func (manager *Manager) ListPartitions(ctx context.Context) (result []schema.Partition, err error)

func (*Manager) ListQueueStatuses

func (manager *Manager) ListQueueStatuses(ctx context.Context) (result []schema.QueueStatus, err error)

ListQueueStatuses returns the status breakdown for all queues.

func (*Manager) ListQueues

func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (result *schema.QueueList, err error)

ListQueues returns all queues as a list.

func (*Manager) ListTasks

func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (result *schema.TaskList, err error)

ListTasks returns all tasks with optional queue and status filtering.

func (*Manager) ListTickers

func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (result *schema.TickerList, err error)

ListTickers returns all tickers as a list.

func (*Manager) NextTask

func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (result *schema.Task, err error)

NextTask retains a task from any of the specified queues, and returns it. If no task is available, nil is returned.

func (*Manager) NextTicker

func (manager *Manager) NextTicker(ctx context.Context) (result *schema.Ticker, err error)

NextTicker returns the next matured ticker, or nil.

func (*Manager) RegisterQueue

func (manager *Manager) RegisterQueue(ctx context.Context, name string, meta schema.QueueMeta, callback schema.TaskFunc) (result *schema.Queue, err error)

RegisterQueue creates a new queue, or updates an existing queue, and returns it.

func (*Manager) RegisterTicker

func (manager *Manager) RegisterTicker(ctx context.Context, name string, meta schema.TickerMeta, callback schema.TaskFunc) (resultPtr *schema.Ticker, err error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) ReleaseTask

func (manager *Manager) ReleaseTask(ctx context.Context, taskId uint64, success bool, result json.RawMessage, status *string) (resp *schema.Task, err error)

ReleaseTask releases a task, optionally marking it as failed, and returns it.

func (*Manager) Run

func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error

func (*Manager) UpdateQueue

func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (result *schema.Queue, err error)

UpdateQueue updates an existing queue, and returns it.

func (*Manager) UpdateTicker

func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (result *schema.Ticker, err error)

UpdateTicker updates an existing ticker, and returns it.

type Opt

type Opt func(*opt) error

Opt configures a Manager during construction.

func WithMaintenancePeriod added in v1.3.3

func WithMaintenancePeriod(period time.Duration) Opt

WithMaintenancePeriod sets how often the maintenance ticker runs. Values less than 1 second are ignored.

func WithMeter

func WithMeter(meter metric.Meter) Opt

WithMeter sets the OpenTelemetry meter used for manager metrics.

func WithPartitionAhead added in v1.3.3

func WithPartitionAhead(ahead uint64) Opt

WithPartitionAhead sets how many partitions to create when threshold is reached.

func WithPartitionSize added in v1.3.3

func WithPartitionSize(size uint64) Opt

WithPartitionSize sets the task partition size. Values less than 1 are ignored.

func WithPartitionThreshold added in v1.3.3

func WithPartitionThreshold(threshold float64) Opt

WithPartitionThreshold sets the partition creation threshold in the range (0,1].

func WithSchema

func WithSchema(schemaName string) Opt

WithSchema sets the database schema names to use for all queries. If not set the default schemas are used.

func WithTracer

func WithTracer(tracer trace.Tracer) Opt

WithTracer sets the OpenTelemetry tracer used for manager spans.

func WithWorker

func WithWorker(workerName string) Opt

WithWorker sets the worker name used for manager tasks. If not set the hostname is used.

type Result

type Result struct {
	Queue  string            `json:"queue,omitempty"`
	Task   *schema.Task      `json:"task,omitempty"`
	Ticker string            `json:"ticker,omitempty"`
	Result json.RawMessage   `json:"result,omitempty"`
	Error  error             `json:"error,omitempty"`
	Trace  trace.SpanContext `json:"-"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL