Documentation
¶
Index ¶
Constants ¶
const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 )
Defaults for BatchItemProcessorOptions.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchItemProcessor ¶
type BatchItemProcessor[T any] struct { // contains filtered or unexported fields }
BatchItemProcessor is a buffer that batches asynchronously-received items and sends them to a exporter when complete.
func NewBatchItemProcessor ¶
func NewBatchItemProcessor[T any](exporter ItemExporter[T], log logrus.FieldLogger, options ...BatchItemProcessorOption) *BatchItemProcessor[T]
NewBatchItemProcessor creates a new ItemProcessor that will send completed item batches to the exporter with the supplied options.
If the exporter is nil, the item processor will preform no action.
func (*BatchItemProcessor[T]) ForceFlush ¶
func (bvp *BatchItemProcessor[T]) ForceFlush(ctx context.Context) error
ForceFlush exports all ended items that have not yet been exported.
func (*BatchItemProcessor[T]) Shutdown ¶
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error
Shutdown flushes the queue and waits until all items are processed. It only executes once. Subsequent call does nothing.
func (*BatchItemProcessor[T]) Write ¶
func (bvp *BatchItemProcessor[T]) Write(s *T)
OnEnd method enqueues a item for later processing.
type BatchItemProcessorOption ¶
type BatchItemProcessorOption func(o *BatchItemProcessorOptions)
BatchItemProcessorOption configures a BatchItemProcessor.
func WithBatchTimeout ¶
func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption
WithBatchTimeout returns a BatchItemProcessorOption that configures the maximum delay allowed for a BatchItemProcessor before it will export any held item (whether the queue is full or not).
func WithExportTimeout ¶
func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption
WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.
func WithMaxExportBatchSize ¶
func WithMaxExportBatchSize(size int) BatchItemProcessorOption
WithMaxExportBatchSize returns a BatchItemProcessorOption that configures the maximum export batch size allowed for a BatchItemProcessor.
func WithMaxQueueSize ¶
func WithMaxQueueSize(size int) BatchItemProcessorOption
WithMaxQueueSize returns a BatchItemProcessorOption that configures the maximum queue size allowed for a BatchItemProcessor.
type BatchItemProcessorOptions ¶
type BatchItemProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the
// queue gets full it drops the items.
// The default value of MaxQueueSize is 51200.
MaxQueueSize int
// BatchTimeout is the maximum duration for constructing a batch. Processor
// forcefully sends available items when timeout is reached.
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration
// ExportTimeout specifies the maximum duration for exporting items. If the timeout
// is reached, the export will be canceled.
// The default value of ExportTimeout is 30000 msec.
ExportTimeout time.Duration
// MaxExportBatchSize is the maximum number of items to process in a single batch.
// If there are more than one batch worth of items then it processes multiple batches
// of items one batch after the other without any delay.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
}
BatchItemProcessorOptions is configuration settings for a BatchItemProcessor.
type ItemExporter ¶
type ItemExporter[T any] interface { // ExportItems exports a batch of items. // // This function is called synchronously, so there is no concurrency // safety requirement. However, due to the synchronous calling pattern, // it is critical that all timeouts and cancellations contained in the // passed context must be honored. // // Any retry logic must be contained in this function. The SDK that // calls this function will not implement any retry logic. All errors // returned by this function are considered unrecoverable and will be // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error // Shutdown notifies the exporter of a pending halt to operations. The // exporter is expected to preform any cleanup or synchronization it // requires while honoring all timeouts and cancellations contained in // the passed context. Shutdown(ctx context.Context) error }
ItemExporter exports batches of items.