jobqueue

package
v0.43.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 12 Imported by: 2

README

JobQueue Design Goal

The jobqueue package implemented a reusable job queue system for async message processing.

The most common use case is to work on each finalized block async.

For instance, verification nodes must verify each finalized block. This needs to happen async, otherwise a verification node might get overwhelmed during periods when a large amount of blocks are finalized quickly (e.g. when a node comes back online and is catching up from other peers).

So the goal for the jobqueue system are:

  1. guarantee each job (i.e. finalized block) will be processed eventually
  2. in the event of a crash failure, the jobqueue state is persisted and workers can be rescheduled so that no job is skipped.
  3. allow concurrent processing of multiple jobs
  4. the number of concurrent workers is configurable so that the node won't get overwhelmed when too many jobs are created (i.e. too many blocks are finalized in a short period of time)

JobQueue components

To achieve the above goal, the jobqueue system contains the following components/interfaces:

  1. A Jobs module to find jobs by job index
  2. A storage.ConsumerProgress to store job processing progress
  3. A Worker module to process jobs and report job completion.
  4. A Consumer that orchestrates the job processing by finding new jobs, creating workers for each job using the above modules, and managing job processing status internally.

Using module.Jobs to find jobs

There is no JobProducer in jobqueue design. Job queue assumes each job can be indexed by a uint64 value, just like each finalized block (or sealed block) can be indexed by block height.

Let's just call this uint64 value "Job Index" or index.

So if we iterate through each index from low to high, and find each job by index, then we are able to iterate through each job.

Therefore modules.Job interface abstracts it into a method: AtIndex.

AtIndex method returns the job at any given index.

Job consumer relies on the modules.Jobs to find jobs. However, modules.Jobs doesn't provide a way to notify as soon as a new job is available. So it's consumer's job to keep track of the values returned by module.Jobs's Head method and find jobs that are new.

Using Check method to notify job consumer for checking new jobs

Job consumer provides the Check method for users to notify new jobs available.

Once called, job consumer will iterate through each height with the AtIndex method. It stops when one of the following condition is true:

  1. no job was found at a index
  2. no more workers to work on them, which is limited by the config item maxProcessing

Check method is concurrent safe, meaning even if job consumer is notified concurrently about new jobs available, job consumer will check at most once to find new jobs.

Whenever a worker finishes a job, job consumer will also call Check internally.

Storing job consuming progress in storage.ConsumerProgress

Job consumer stores the last processed job index in storage.ConsumerProgress, so that on startup, the job consumer can read the last processed job index from storage and compare with the last available job index from module.Jobs's Head method to resume job processing.

This ensures each job will be processed at least once. Note: given the at least once execution, the Worker should gracefully handle duplicate runs of the same job.

Using Workers to work on each job

When Job consumer finds a new job, it uses an implementation of the Worker interface to process each job. The Workers Run method accepts a module.Job interface. So it's the user's responsibility to handle the conversion between module.Job and the underlying data type.

In the scenario of processing finalized blocks, implementing symmetric functions like BlockToJob and JobToBlock are recommended for this conversion.

In order to report job completion, the worker needs to call job consumer's NotifyJobIsDone method.

Error handling

Job queue doesn't allow job to fail, because job queue has to guarantee any job below the last processed job index has been finished successfully. Leaving a gap is not accpeted.

Therefore, if a worker fails to process a job, it should retry by itself, or just crash.

Note, Worker should not log the error and report the job is completed, because that would change the last processed job index, and will not have the chance to retry that job.

Pipeline Pattern

Multiple jobqueues can be combined to form a pipeline. This is useful in the scenario that the first job queue will process each finalized block and create jobs to process data depending on the block, and having the second job queue to process each job created by the worker of the first job queue.

For instance, verification node uses 2-jobqueue pipeline to find chunks from each block and create jobs if the block has chunks that it needs to verify, and the second job queue will allow verification node to verify each chunk with a max number of workers.

Considerations

Push vs Pull

The jobqueue architecture is optimized for "pull" style processes, where the job producer simply notify the job consumer about new jobs without creating any job, and job consumer pulls jobs from a source when workers are available. All current implementations are using this pull style since it lends well to asynchronously processing jobs based on block heights.

Some use cases might require "push" style jobs where there is a job producer that create new jobs, and a consumer that processes work from the producer. This is possible with the jobqueue, but requires the producer persist the jobs to a database, then implement the Head and AtIndex methods that allow accessing jobs by sequential uint64 indexes.

TODOs

  1. Jobs at different index are processed in parallel, it's possible that there is a job takes a long time to work on, and causing too many completed jobs cached in memory before being used to update the last processed job index. maxSearchAhead will allow the job consumer to stop consume more blocks if too many jobs are completed, but the job at index lastProcesssed + 1 has not been unprocessed yet. The difference between maxSearchAhead and maxProcessing is that: maxProcessing allows at most maxProcessing number of works to process jobs. However, even if there is worker available, it might not be assigned to a job, because the job at index lastProcesssed +1 has not been done, it won't work on an job with index higher than lastProcesssed + maxSearchAhead.
  2. accept callback to get notified when the consecutive job index is finished.
  3. implement ReadyDoneAware interface

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func JobID added in v0.26.2

func JobID(blockID flow.Identifier) module.JobID

JobID returns the corresponding unique job id of the BlockJob for this job.

func JobToBlock added in v0.26.2

func JobToBlock(job module.Job) (*flow.Block, error)

JobToBlock converts a block job into its corresponding block.

func JobToBlockHeader added in v0.26.2

func JobToBlockHeader(job module.Job) (*flow.Header, error)

JobToBlockHeader converts a block job into its corresponding block header.

Types

type BlockHeaderJob added in v0.26.2

type BlockHeaderJob struct {
	Header *flow.Header
}

BlockHeaderJob implements the Job interface. It converts a Block Header into a Job to be used by job queue.

In current architecture, BlockHeaderJob represents a finalized block enqueued to be processed by a consumer that implements the JobQueue interface.

func BlockHeaderToJob added in v0.26.2

func BlockHeaderToJob(header *flow.Header) *BlockHeaderJob

BlockHeaderToJob converts the block to a BlockHeaderJob.

func (BlockHeaderJob) ID added in v0.26.2

func (j BlockHeaderJob) ID() module.JobID

ID converts block id into job id, which guarantees uniqueness.

type BlockJob added in v0.26.2

type BlockJob struct {
	Block *flow.Block
}

BlockJob implements the Job interface. It converts a Block into a Job to be used by job queue.

In current architecture, BlockJob represents a finalized block enqueued to be processed by the BlockConsumer that implements the JobQueue interface.

func BlockToJob added in v0.26.2

func BlockToJob(block *flow.Block) *BlockJob

BlockToJob converts the block to a BlockJob.

func (BlockJob) ID added in v0.26.2

func (j BlockJob) ID() module.JobID

ID converts block id into job id, which guarantees uniqueness.

type ComponentConsumer added in v0.26.2

type ComponentConsumer struct {
	component.Component
	// contains filtered or unexported fields
}

func NewComponentConsumer added in v0.26.2

func NewComponentConsumer(
	log zerolog.Logger,
	workSignal <-chan struct{},
	progressInitializer storage.ConsumerProgressInitializer,
	jobs module.Jobs,
	defaultIndex uint64,
	processor JobProcessor,
	maxProcessing uint64,
	maxSearchAhead uint64,
) (*ComponentConsumer, error)

NewComponentConsumer creates a new ComponentConsumer consumer

func (*ComponentConsumer) Head added in v0.26.2

func (c *ComponentConsumer) Head() (uint64, error)

Head returns the highest job index available

func (*ComponentConsumer) LastProcessedIndex added in v0.26.2

func (c *ComponentConsumer) LastProcessedIndex() uint64

LastProcessedIndex returns the last processed job index

func (*ComponentConsumer) NotifyJobIsDone added in v0.26.2

func (c *ComponentConsumer) NotifyJobIsDone(jobID module.JobID) uint64

NotifyJobIsDone is invoked by the worker to let the consumer know that it is done processing a (block) job.

func (*ComponentConsumer) SetPostNotifier added in v0.26.2

func (c *ComponentConsumer) SetPostNotifier(fn NotifyDone)

SetPostNotifier sets a notification function that is invoked after marking a job as done in the consumer.

Note: This guarantees that the function is executed after consumer updates the last processed index, but notifications may be missed in the event of a crash.

func (*ComponentConsumer) SetPreNotifier added in v0.26.2

func (c *ComponentConsumer) SetPreNotifier(fn NotifyDone)

SetPreNotifier sets a notification function that is invoked before marking a job as done in the consumer.

Note: This guarantees that the function is called at least once for each job, but may be executed before consumer updates the last processed index.

func (*ComponentConsumer) Size added in v0.26.2

func (c *ComponentConsumer) Size() uint

Size returns number of in-memory block jobs that block consumer is processing.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(
	log zerolog.Logger,
	jobs module.Jobs,
	progressInitializer storage.ConsumerProgressInitializer,
	worker Worker,
	maxProcessing uint64,
	maxSearchAhead uint64,
	defaultIndex uint64,
) (*Consumer, error)

func (*Consumer) Check

func (c *Consumer) Check()

Check allows the job publisher to notify the consumer that a new job has been added, so that the consumer can check if the job is processable since multiple checks at the same time are unnecessary, we could only keep one check by checking. an atomic isChecking value.

func (*Consumer) LastProcessedIndex added in v0.26.2

func (c *Consumer) LastProcessedIndex() uint64

LastProcessedIndex returns the last processed job index

func (*Consumer) NotifyJobIsDone

func (c *Consumer) NotifyJobIsDone(jobID module.JobID) uint64

NotifyJobIsDone let the consumer know a job has been finished, so that consumer will take the next job from the job queue if there are workers available. It returns the last processed job index.

func (*Consumer) Size added in v0.17.6

func (c *Consumer) Size() uint

Size returns number of in-memory jobs that consumer is processing.

func (*Consumer) Start

func (c *Consumer) Start() error

Start starts consuming the jobs from the job queue.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop stops consuming jobs from the job queue. It blocks until the existing worker finish processing the job Note, it won't stop the existing worker from finishing their job

type FinalizedBlockReader added in v0.26.2

type FinalizedBlockReader struct {
	// contains filtered or unexported fields
}

FinalizedBlockReader provides an abstraction for consumers to read blocks as job.

func NewFinalizedBlockReader added in v0.26.2

func NewFinalizedBlockReader(state protocol.State, blocks storage.Blocks) *FinalizedBlockReader

NewFinalizedBlockReader creates and returns a FinalizedBlockReader.

func (FinalizedBlockReader) AtIndex added in v0.26.2

func (r FinalizedBlockReader) AtIndex(index uint64) (module.Job, error)

AtIndex returns the block job at the given index. The block job at an index is just the finalized block at that index (i.e., height).

func (FinalizedBlockReader) Head added in v0.26.2

func (r FinalizedBlockReader) Head() (uint64, error)

Head returns the last finalized height as job index.

type JobProcessor added in v0.26.2

type JobProcessor func(irrecoverable.SignalerContext, module.Job, func())

JobProcessor is called by the worker to execute each job. It should only return when the job has completed, either successfully or after performing any failure handling. It takes 3 arguments:

  • irrecoverable.SignalerContext: this is used to signal shutdown to the worker and throw any irrecoverable errors back to the parent. The signaller context is passed in from consumer's Start method
  • module.Job: the job to be processed. The processor is responsible for decoding into the expected format.
  • func(): Call this closure after the job is considered complete. This is a convenience method that avoid needing to a separate ProcessingNotifier for simple usecases. If a different method is used to signal jobs are done to the consumer, this function can be ignored.

type NotifyDone added in v0.26.2

type NotifyDone func(module.JobID)

NotifyDone should be the consumer's NotifyJobIsDone method, or a wrapper for that method. It is wrapped in a closure and added as an argument to the JobProcessor to notify the consumer that the job is done.

type SealedBlockHeaderReader added in v0.26.2

type SealedBlockHeaderReader struct {
	// contains filtered or unexported fields
}

SealedBlockHeaderReader provides an abstraction for consumers to read blocks as job.

func NewSealedBlockHeaderReader added in v0.26.2

func NewSealedBlockHeaderReader(state protocol.State, headers storage.Headers) *SealedBlockHeaderReader

NewSealedBlockHeaderReader creates and returns a SealedBlockHeaderReader.

func (SealedBlockHeaderReader) AtIndex added in v0.26.2

func (r SealedBlockHeaderReader) AtIndex(index uint64) (module.Job, error)

AtIndex returns the block header job at the given index. The block header job at an index is just the finalized block header at that index (i.e., height). Error returns:

  • storage.ErrNotFound if the provided index is not sealed

func (SealedBlockHeaderReader) Head added in v0.26.2

func (r SealedBlockHeaderReader) Head() (uint64, error)

Head returns the last sealed height as job index.

type Worker

type Worker interface {
	// returned error must be unexpected fatal error
	Run(job module.Job) error
}

type WorkerPool added in v0.26.2

type WorkerPool struct {
	component.Component
	// contains filtered or unexported fields
}

WorkerPool implements the jobqueue.Worker interface, and wraps the processing to make it compatible with the Component interface.

func NewWorkerPool added in v0.26.2

func NewWorkerPool(processor JobProcessor, notify NotifyDone, workers uint64) *WorkerPool

NewWorkerPool returns a new WorkerPool

func (*WorkerPool) Run added in v0.26.2

func (w *WorkerPool) Run(job module.Job) error

Run executes the worker's JobProcessor for the provided job. Run is non-blocking.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL