batch

package
v0.20.0-beta.rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoltBatcher

type BoltBatcher[Q any] struct {
	// contains filtered or unexported fields
}

BoltBatcher is a bbolt implementation of the sqldb.BatchedTx interface.

func NewBoltBackend

func NewBoltBackend[Q any](db kvdb.Backend) *BoltBatcher[Q]

NewBoltBackend creates a new BoltBackend instance.

func (*BoltBatcher[Q]) ExecTx

func (t *BoltBatcher[Q]) ExecTx(_ context.Context, opts sqldb.TxOptions,
	txBody func(Q) error, reset func()) error

ExecTx will execute the passed txBody, operating upon generic parameter Q (usually a storage interface) in a single transaction.

NOTE: This is part of the sqldb.BatchedTx interface.

type Request

type Request[Q any] struct {
	// Opts holds various configuration options for a scheduled request.
	Opts *SchedulerOptions

	// Reset is called before each invocation of Update and is used to clear
	// any possible modifications to local state as a result of previous
	// calls to Update that were not committed due to a concurrent batch
	// failure.
	//
	// NOTE: This field is optional.
	Reset func()

	// Do is applied alongside other operations in the batch.
	//
	// NOTE: This method MUST NOT acquire any mutexes.
	Do func(tx Q) error

	// OnCommit is called if the batch or a subset of the batch including
	// this request all succeeded without failure. The passed error should
	// contain the result of the transaction commit, as that can still fail
	// even if none of the closures returned an error.
	//
	// NOTE: This field is optional.
	OnCommit func(commitErr error) error
}

Request defines an operation that can be batched into a single bbolt transaction.

type Scheduler

type Scheduler[Q any] interface {
	// Execute schedules a Request for execution with the next available
	// batch. This method blocks until the underlying closure has been
	// run against the database. The resulting error is returned to the
	// caller.
	Execute(ctx context.Context, req *Request[Q]) error
}

Scheduler abstracts a generic batching engine that accumulates an incoming set of Requests, executes them, and returns the error from the operation.

type SchedulerOption

type SchedulerOption func(*SchedulerOptions)

SchedulerOption is a type that can be used to supply options to a scheduled request.

func LazyAdd

func LazyAdd() SchedulerOption

LazyAdd will make the request be executed lazily, added to the next batch to reduce db contention.

func ReadOnly

func ReadOnly() SchedulerOption

ReadOnly will mark the request as read-only. This means that the transaction will be executed in read-only mode, and no changes will be made to the database. If any requests in the same batch are not read-only, then the entire batch will be executed in read-write mode.

type SchedulerOptions

type SchedulerOptions struct {
	// Lazy should be true if we don't have to immediately execute this
	// request when it comes in. This means that it can be scheduled later,
	// allowing larger batches.
	Lazy bool

	// ReadOnly should be true if the request is read-only. By default,
	// this is false.
	ReadOnly bool
}

SchedulerOptions holds various configuration options for a scheduled request.

func NewDefaultSchedulerOpts

func NewDefaultSchedulerOpts() *SchedulerOptions

NewDefaultSchedulerOpts returns a new SchedulerOptions with default values.

func NewSchedulerOptions

func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions

NewSchedulerOptions returns a new SchedulerOptions with the given options applied on top of the default options.

type TimeScheduler

type TimeScheduler[Q any] struct {
	// contains filtered or unexported fields
}

TimeScheduler is a batching engine that executes requests within a fixed horizon. When the first request is received, a TimeScheduler waits a configurable duration for other concurrent requests to join the batch. Once this time has elapsed, the batch is closed and executed. Subsequent requests are then added to a new batch which undergoes the same process.

func NewTimeScheduler

func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker,
	duration time.Duration) *TimeScheduler[Q]

NewTimeScheduler initializes a new TimeScheduler with a fixed duration at which to schedule batches. If the operation needs to modify a higher-level cache, the cache's lock should be provided to so that external consistency can be maintained, as successful db operations will cause a request's OnCommit method to be executed while holding this lock.

func (*TimeScheduler[Q]) Execute

func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error

Execute schedules the provided request for batch execution along with other concurrent requests. The request will be executed within a fixed horizon, parameterizeed by the duration of the scheduler. The error from the underlying operation is returned to the caller.

NOTE: Part of the Scheduler interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL