batch

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AutoTuneConfig added in v0.9.3

type AutoTuneConfig struct {
	// AutoTune indicates whether the batch sender should auto-tune the batch
	// size based on observed throughput. Defaults to false.
	Enabled bool
	// AutoTuneMinBatchBytes is the minimum batch size in bytes when auto-tuning
	// is enabled. Defaults to 1MiB.
	MinBatchBytes int64
	// AutoTuneMaxBatchBytes is the maximum batch size in bytes when auto-tuning
	// is enabled. Defaults to 50MiB.
	MaxBatchBytes int64
	// AutoTuneConvergenceThreshold is the threshold for convergence when
	// auto-tuning is enabled. Defaults to 0.01 (1%).
	ConvergenceThreshold float64
}

func (*AutoTuneConfig) GetConvergenceThreshold added in v0.9.3

func (c *AutoTuneConfig) GetConvergenceThreshold() float64

func (*AutoTuneConfig) GetMaxBatchBytes added in v0.9.3

func (c *AutoTuneConfig) GetMaxBatchBytes() int64

func (*AutoTuneConfig) GetMinBatchBytes added in v0.9.3

func (c *AutoTuneConfig) GetMinBatchBytes() int64

func (*AutoTuneConfig) IsValid added in v0.9.3

func (c *AutoTuneConfig) IsValid() error

type Batch

type Batch[T Message] struct {
	// contains filtered or unexported fields
}

func NewBatch

func NewBatch[T Message](messages []T, positions []wal.CommitPosition) *Batch[T]

func (*Batch[T]) GetCommitPositions

func (b *Batch[T]) GetCommitPositions() []wal.CommitPosition

func (*Batch[T]) GetMessages

func (b *Batch[T]) GetMessages() []T

type Config

type Config struct {
	// BatchTime is the max time interval at which the batch sending is
	// triggered. Defaults to 1s
	BatchTimeout time.Duration
	// MaxBatchBytes is the max size in bytes for a given batch. When this size is
	// reached, the batch is sent. Defaults to 1572864 bytes.
	MaxBatchBytes int64
	// MaxBatchSize is the max number of messages to be sent per batch. When this
	// size is reached, the batch is sent. Defaults to 100.
	MaxBatchSize int64
	// MaxQueueBytes is the max memory used by the batch writer for inflight
	// batches. Defaults to 100MiB
	MaxQueueBytes int64
	// IgnoreSendErrors indicates whether sending errors should be ignored or
	// not. If set to true, errors will be logged but the batch will continue
	// processing. Defaults to false.
	IgnoreSendErrors bool

	AutoTune AutoTuneConfig
}

func (*Config) GetAutoTuneConfig added in v0.9.3

func (c *Config) GetAutoTuneConfig() AutoTuneConfig

func (*Config) GetBatchTimeout

func (c *Config) GetBatchTimeout() time.Duration

func (*Config) GetMaxBatchBytes

func (c *Config) GetMaxBatchBytes() int64

func (*Config) GetMaxBatchSize

func (c *Config) GetMaxBatchSize() int64

func (*Config) GetMaxQueueBytes

func (c *Config) GetMaxQueueBytes() (int64, error)

type Message

type Message interface {
	Size() int
	IsEmpty() bool
}

type Sender

type Sender[T Message] struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender[T Message](ctx context.Context, config *Config, sendfn sendBatchFn[T], logger loglib.Logger) (*Sender[T], error)

func (*Sender[T]) Close

func (s *Sender[T]) Close()

Close will stop the sending, and wait for all ongoing batches to finish. It is safe to call multiple times.

func (*Sender[T]) SendMessage added in v0.5.0

func (s *Sender[T]) SendMessage(ctx context.Context, msg *WALMessage[T]) error

SendMessage adds the message to the batch, which will be sent when the interval or the max number of messages is reached by a background process.

type WALMessage

type WALMessage[T Message] struct {
	// contains filtered or unexported fields
}

WALMessage is a wrapper around any kind of message implementing the Message interface which contains a wal commit position.

func NewWALMessage

func NewWALMessage[T Message](msg T, pos wal.CommitPosition) *WALMessage[T]

func (*WALMessage[T]) GetMessage

func (m *WALMessage[T]) GetMessage() T

func (*WALMessage[T]) GetPosition

func (m *WALMessage[T]) GetPosition() wal.CommitPosition

func (*WALMessage[T]) Size

func (m *WALMessage[T]) Size() int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL