Documentation
      ¶
    
    
  
    
  
    Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoltBatcher ¶
type BoltBatcher[Q any] struct { // contains filtered or unexported fields }
BoltBatcher is a bbolt implementation of the sqldb.BatchedTx interface.
func NewBoltBackend ¶
func NewBoltBackend[Q any](db kvdb.Backend) *BoltBatcher[Q]
NewBoltBackend creates a new BoltBackend instance.
func (*BoltBatcher[Q]) ExecTx ¶
func (t *BoltBatcher[Q]) ExecTx(_ context.Context, opts sqldb.TxOptions, txBody func(Q) error, reset func()) error
ExecTx will execute the passed txBody, operating upon generic parameter Q (usually a storage interface) in a single transaction.
NOTE: This is part of the sqldb.BatchedTx interface.
type Request ¶
type Request[Q any] struct { // Opts holds various configuration options for a scheduled request. Opts *SchedulerOptions // Reset is called before each invocation of Update and is used to clear // any possible modifications to local state as a result of previous // calls to Update that were not committed due to a concurrent batch // failure. // // NOTE: This field is optional. Reset func() // Do is applied alongside other operations in the batch. // // NOTE: This method MUST NOT acquire any mutexes. Do func(tx Q) error // OnCommit is called if the batch or a subset of the batch including // this request all succeeded without failure. The passed error should // contain the result of the transaction commit, as that can still fail // even if none of the closures returned an error. // // NOTE: This field is optional. OnCommit func(commitErr error) error }
Request defines an operation that can be batched into a single bbolt transaction.
type Scheduler ¶
type Scheduler[Q any] interface { // Execute schedules a Request for execution with the next available // batch. This method blocks until the underlying closure has been // run against the database. The resulting error is returned to the // caller. Execute(ctx context.Context, req *Request[Q]) error }
Scheduler abstracts a generic batching engine that accumulates an incoming set of Requests, executes them, and returns the error from the operation.
type SchedulerOption ¶
type SchedulerOption func(*SchedulerOptions)
SchedulerOption is a type that can be used to supply options to a scheduled request.
func LazyAdd ¶
func LazyAdd() SchedulerOption
LazyAdd will make the request be executed lazily, added to the next batch to reduce db contention.
func ReadOnly ¶
func ReadOnly() SchedulerOption
ReadOnly will mark the request as read-only. This means that the transaction will be executed in read-only mode, and no changes will be made to the database. If any requests in the same batch are not read-only, then the entire batch will be executed in read-write mode.
type SchedulerOptions ¶
type SchedulerOptions struct {
	// Lazy should be true if we don't have to immediately execute this
	// request when it comes in. This means that it can be scheduled later,
	// allowing larger batches.
	Lazy bool
	// ReadOnly should be true if the request is read-only. By default,
	// this is false.
	ReadOnly bool
}
    SchedulerOptions holds various configuration options for a scheduled request.
func NewDefaultSchedulerOpts ¶
func NewDefaultSchedulerOpts() *SchedulerOptions
NewDefaultSchedulerOpts returns a new SchedulerOptions with default values.
func NewSchedulerOptions ¶
func NewSchedulerOptions(options ...SchedulerOption) *SchedulerOptions
NewSchedulerOptions returns a new SchedulerOptions with the given options applied on top of the default options.
type TimeScheduler ¶
type TimeScheduler[Q any] struct { // contains filtered or unexported fields }
TimeScheduler is a batching engine that executes requests within a fixed horizon. When the first request is received, a TimeScheduler waits a configurable duration for other concurrent requests to join the batch. Once this time has elapsed, the batch is closed and executed. Subsequent requests are then added to a new batch which undergoes the same process.
func NewTimeScheduler ¶
func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker, duration time.Duration) *TimeScheduler[Q]
NewTimeScheduler initializes a new TimeScheduler with a fixed duration at which to schedule batches. If the operation needs to modify a higher-level cache, the cache's lock should be provided to so that external consistency can be maintained, as successful db operations will cause a request's OnCommit method to be executed while holding this lock.
func (*TimeScheduler[Q]) Execute ¶
func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error
Execute schedules the provided request for batch execution along with other concurrent requests. The request will be executed within a fixed horizon, parameterizeed by the duration of the scheduler. The error from the underlying operation is returned to the caller.
NOTE: Part of the Scheduler interface.