worker

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackgroundProcessor

type BackgroundProcessor interface {
	// Schedule enqueues the job to be executed in the background, in case it's not
	// already scheduled.
	//
	// The `key` argument serves for deduplicating jobs and the `remoteBackoff` arg
	// is how long that key should be held unschedulable in the remote cache. The
	// `arg` is passed to the process function as is when it is time for processing
	// this entry.
	//
	// Returns true if job was scheduled now or false if it was already in the local
	// queue for execution or held unschedulable in the remote cache.
	Schedule(key string, remoteBackoff time.Duration, arg interface{}) bool
	// QueueLength returns the current number of scheduled jobs in the local queue.
	QueueLength() int
	// BumpRemoteDedupKey increments the base key used for dedupping jobs in the
	// remote cache (Redis), so that all jobs can be rescheduled across all worker
	// instances.
	BumpRemoteDedupKey() error
}

BackgroundProcessor is a utility for running jobs in the background, avoiding the rescheduling of the same job by a provided key. The rescheduling prevention is performed both locally and in a remote Redis, for a simple best-effort global job deduplication across many worker instances.

We avoid receiving a separate function for each job to prevent possible memory leaking by capturing references for example to the current request, instead having a constant function to which callers specify an argument on scheduling. There's no interface for stopping the background go-routine for now.

func NewBackgroundProcessor

func NewBackgroundProcessor(initialCapacity int, redis redis.Cache, processFunc JobFn) BackgroundProcessor

type JobFn

type JobFn func(interface{}) time.Duration

type SyncQueue

type SyncQueue struct {
	// contains filtered or unexported fields
}

SyncQueue is a helper for a (channel-like) queue that allows multiple concurrent senders and receivers and never blocks indefinitely on enqueueuing by resizing the internal channel when full. It doesn't guarantee execution order when queue resizes occur but that should stop once queue size stabilizes.

func NewSyncQueue

func NewSyncQueue(initialCapacity int) *SyncQueue

func (*SyncQueue) Dequeue

func (q *SyncQueue) Dequeue() interface{}

func (*SyncQueue) Enqueue

func (q *SyncQueue) Enqueue(item interface{})

func (*SyncQueue) Len

func (q *SyncQueue) Len() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL