Documentation
¶
Index ¶
- func NewExec(tracer trace.Tracer) *exec
- type Manager
- func (manager *Manager) CleanQueue(ctx context.Context, name string) (result []schema.Task, err error)
- func (manager *Manager) CreateNextPartition(ctx context.Context) (result string, err error)
- func (manager *Manager) CreatePartition(ctx context.Context, meta schema.PartitionMeta) (err error)
- func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (result *schema.Task, err error)
- func (manager *Manager) DeletePartition(ctx context.Context, name string) (err error)
- func (manager *Manager) DeleteQueue(ctx context.Context, name string) (result *schema.Queue, err error)
- func (manager *Manager) DeleteTicker(ctx context.Context, name string) (result *schema.Ticker, err error)
- func (manager *Manager) DropDrainedPartition(ctx context.Context) (result string, err error)
- func (manager *Manager) GetPartitionSeq(ctx context.Context) (result uint64, err error)
- func (manager *Manager) GetQueue(ctx context.Context, name string) (result *schema.Queue, err error)
- func (manager *Manager) GetTicker(ctx context.Context, name string) (result *schema.Ticker, err error)
- func (manager *Manager) ListPartitions(ctx context.Context) (result []schema.Partition, err error)
- func (manager *Manager) ListQueueStatuses(ctx context.Context) (result []schema.QueueStatus, err error)
- func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (result *schema.QueueList, err error)
- func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (result *schema.TaskList, err error)
- func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (result *schema.TickerList, err error)
- func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (result *schema.Task, err error)
- func (manager *Manager) NextTicker(ctx context.Context) (result *schema.Ticker, err error)
- func (manager *Manager) RegisterQueue(ctx context.Context, name string, meta schema.QueueMeta, ...) (result *schema.Queue, err error)
- func (manager *Manager) RegisterTicker(ctx context.Context, name string, meta schema.TickerMeta, ...) (resultPtr *schema.Ticker, err error)
- func (manager *Manager) ReleaseTask(ctx context.Context, taskId uint64, success bool, result json.RawMessage, ...) (resp *schema.Task, err error)
- func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error
- func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (result *schema.Queue, err error)
- func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (result *schema.Ticker, err error)
- type Opt
- func WithMaintenancePeriod(period time.Duration) Opt
- func WithMeter(meter metric.Meter) Opt
- func WithPartitionAhead(ahead uint64) Opt
- func WithPartitionSize(size uint64) Opt
- func WithPartitionThreshold(threshold float64) Opt
- func WithSchema(schemaName string) Opt
- func WithTracer(tracer trace.Tracer) Opt
- func WithWorker(workerName string) Opt
- type Result
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Manager ¶
func (*Manager) CleanQueue ¶
func (manager *Manager) CleanQueue(ctx context.Context, name string) (result []schema.Task, err error)
CleanQueue removes stale tasks from a queue, and returns the tasks removed.
func (*Manager) CreateNextPartition ¶
func (*Manager) CreatePartition ¶
func (*Manager) CreateTask ¶
func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (result *schema.Task, err error)
CreateTask creates a new task in a queue, and returns it.
func (*Manager) DeletePartition ¶
func (*Manager) DeleteQueue ¶
func (manager *Manager) DeleteQueue(ctx context.Context, name string) (result *schema.Queue, err error)
DeleteQueue deletes an existing queue, and returns it.
func (*Manager) DeleteTicker ¶
func (manager *Manager) DeleteTicker(ctx context.Context, name string) (result *schema.Ticker, err error)
DeleteTicker deletes an existing ticker, and returns the deleted ticker.
func (*Manager) DropDrainedPartition ¶
func (*Manager) GetPartitionSeq ¶
func (*Manager) GetQueue ¶
func (manager *Manager) GetQueue(ctx context.Context, name string) (result *schema.Queue, err error)
GetQueue returns a queue by name.
func (*Manager) GetTicker ¶
func (manager *Manager) GetTicker(ctx context.Context, name string) (result *schema.Ticker, err error)
GetTicker returns a ticker by name.
func (*Manager) ListPartitions ¶
func (*Manager) ListQueueStatuses ¶
func (manager *Manager) ListQueueStatuses(ctx context.Context) (result []schema.QueueStatus, err error)
ListQueueStatuses returns the status breakdown for all queues.
func (*Manager) ListQueues ¶
func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (result *schema.QueueList, err error)
ListQueues returns all queues as a list.
func (*Manager) ListTasks ¶
func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (result *schema.TaskList, err error)
ListTasks returns all tasks with optional queue and status filtering.
func (*Manager) ListTickers ¶
func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (result *schema.TickerList, err error)
ListTickers returns all tickers as a list.
func (*Manager) NextTask ¶
func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (result *schema.Task, err error)
NextTask retains a task from any of the specified queues, and returns it. If no task is available, nil is returned.
func (*Manager) NextTicker ¶
NextTicker returns the next matured ticker, or nil.
func (*Manager) RegisterQueue ¶
func (manager *Manager) RegisterQueue(ctx context.Context, name string, meta schema.QueueMeta, callback schema.TaskFunc) (result *schema.Queue, err error)
RegisterQueue creates a new queue, or updates an existing queue, and returns it.
func (*Manager) RegisterTicker ¶
func (manager *Manager) RegisterTicker(ctx context.Context, name string, meta schema.TickerMeta, callback schema.TaskFunc) (resultPtr *schema.Ticker, err error)
RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
func (*Manager) ReleaseTask ¶
func (manager *Manager) ReleaseTask(ctx context.Context, taskId uint64, success bool, result json.RawMessage, status *string) (resp *schema.Task, err error)
ReleaseTask releases a task, optionally marking it as failed, and returns it.
type Opt ¶
type Opt func(*opt) error
Opt configures a Manager during construction.
func WithMaintenancePeriod ¶ added in v1.3.3
WithMaintenancePeriod sets how often the maintenance ticker runs. Values less than 1 second are ignored.
func WithPartitionAhead ¶ added in v1.3.3
WithPartitionAhead sets how many partitions to create when threshold is reached.
func WithPartitionSize ¶ added in v1.3.3
WithPartitionSize sets the task partition size. Values less than 1 are ignored.
func WithPartitionThreshold ¶ added in v1.3.3
WithPartitionThreshold sets the partition creation threshold in the range (0,1].
func WithSchema ¶
WithSchema sets the database schema names to use for all queries. If not set the default schemas are used.
func WithTracer ¶
WithTracer sets the OpenTelemetry tracer used for manager spans.
func WithWorker ¶
WithWorker sets the worker name used for manager tasks. If not set the hostname is used.