Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackgroundProcessor ¶
type BackgroundProcessor interface {
// Schedule enqueues the job to be executed in the background, in case it's not
// already scheduled.
//
// The `key` argument serves for deduplicating jobs and the `remoteBackoff` arg
// is how long that key should be held unschedulable in the remote cache. The
// `arg` is passed to the process function as is when it is time for processing
// this entry.
//
// Returns true if job was scheduled now or false if it was already in the local
// queue for execution or held unschedulable in the remote cache.
Schedule(key string, remoteBackoff time.Duration, arg interface{}) bool
// QueueLength returns the current number of scheduled jobs in the local queue.
QueueLength() int
// BumpRemoteDedupKey increments the base key used for dedupping jobs in the
// remote cache (Redis), so that all jobs can be rescheduled across all worker
// instances.
BumpRemoteDedupKey() error
}
BackgroundProcessor is a utility for running jobs in the background, avoiding the rescheduling of the same job by a provided key. The rescheduling prevention is performed both locally and in a remote Redis, for a simple best-effort global job deduplication across many worker instances.
We avoid receiving a separate function for each job to prevent possible memory leaking by capturing references for example to the current request, instead having a constant function to which callers specify an argument on scheduling. There's no interface for stopping the background go-routine for now.
func NewBackgroundProcessor ¶
func NewBackgroundProcessor(initialCapacity int, redis redis.Cache, processFunc JobFn) BackgroundProcessor
type SyncQueue ¶
type SyncQueue struct {
// contains filtered or unexported fields
}
SyncQueue is a helper for a (channel-like) queue that allows multiple concurrent senders and receivers and never blocks indefinitely on enqueueuing by resizing the internal channel when full. It doesn't guarantee execution order when queue resizes occur but that should stop once queue size stabilizes.