processor

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2023 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxQueueSize       = 51200
	DefaultScheduleDelay      = 5000
	DefaultExportTimeout      = 30000
	DefaultMaxExportBatchSize = 512
)

Defaults for BatchItemProcessorOptions.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItemProcessor

type BatchItemProcessor[T any] struct {
	// contains filtered or unexported fields
}

BatchItemProcessor is a buffer that batches asynchronously-received items and sends them to a exporter when complete.

func NewBatchItemProcessor

func NewBatchItemProcessor[T any](exporter ItemExporter[T], log logrus.FieldLogger, options ...BatchItemProcessorOption) *BatchItemProcessor[T]

NewBatchItemProcessor creates a new ItemProcessor that will send completed item batches to the exporter with the supplied options.

If the exporter is nil, the item processor will preform no action.

func (*BatchItemProcessor[T]) ForceFlush

func (bvp *BatchItemProcessor[T]) ForceFlush(ctx context.Context) error

ForceFlush exports all ended items that have not yet been exported.

func (*BatchItemProcessor[T]) Shutdown

func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error

Shutdown flushes the queue and waits until all items are processed. It only executes once. Subsequent call does nothing.

func (*BatchItemProcessor[T]) Write

func (bvp *BatchItemProcessor[T]) Write(s *T)

OnEnd method enqueues a item for later processing.

type BatchItemProcessorOption

type BatchItemProcessorOption func(o *BatchItemProcessorOptions)

BatchItemProcessorOption configures a BatchItemProcessor.

func WithBatchTimeout

func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption

WithBatchTimeout returns a BatchItemProcessorOption that configures the maximum delay allowed for a BatchItemProcessor before it will export any held item (whether the queue is full or not).

func WithExportTimeout

func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption

WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.

func WithMaxExportBatchSize

func WithMaxExportBatchSize(size int) BatchItemProcessorOption

WithMaxExportBatchSize returns a BatchItemProcessorOption that configures the maximum export batch size allowed for a BatchItemProcessor.

func WithMaxQueueSize

func WithMaxQueueSize(size int) BatchItemProcessorOption

WithMaxQueueSize returns a BatchItemProcessorOption that configures the maximum queue size allowed for a BatchItemProcessor.

type BatchItemProcessorOptions

type BatchItemProcessorOptions struct {
	// MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the
	// queue gets full it drops the items.
	// The default value of MaxQueueSize is 51200.
	MaxQueueSize int

	// BatchTimeout is the maximum duration for constructing a batch. Processor
	// forcefully sends available items when timeout is reached.
	// The default value of BatchTimeout is 5000 msec.
	BatchTimeout time.Duration

	// ExportTimeout specifies the maximum duration for exporting items. If the timeout
	// is reached, the export will be cancelled.
	// The default value of ExportTimeout is 30000 msec.
	ExportTimeout time.Duration

	// MaxExportBatchSize is the maximum number of items to process in a single batch.
	// If there are more than one batch worth of items then it processes multiple batches
	// of items one batch after the other without any delay.
	// The default value of MaxExportBatchSize is 512.
	MaxExportBatchSize int
}

BatchItemProcessorOptions is configuration settings for a BatchItemProcessor.

type ItemExporter

type ItemExporter[T any] interface {
	// ExportItems exports a batch of items.
	//
	// This function is called synchronously, so there is no concurrency
	// safety requirement. However, due to the synchronous calling pattern,
	// it is critical that all timeouts and cancellations contained in the
	// passed context must be honored.
	//
	// Any retry logic must be contained in this function. The SDK that
	// calls this function will not implement any retry logic. All errors
	// returned by this function are considered unrecoverable and will be
	// reported to a configured error Handler.
	ExportItems(ctx context.Context, items []*T) error

	// Shutdown notifies the exporter of a pending halt to operations. The
	// exporter is expected to preform any cleanup or synchronization it
	// requires while honoring all timeouts and cancellations contained in
	// the passed context.
	Shutdown(ctx context.Context) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL