writebatcher

package
v0.1.149 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package writebatcher provides a generic, transaction-batching write serializer for database operations. It collects items of any type T from multiple concurrent goroutines and flushes them in batched transactions through a single background worker, eliminating write contention on single-writer databases like SQLite.

This package simplifies batched database writes by:

  • Serializing all writes through one goroutine (no lock contention)
  • Batching items into single transactions (fewer round-trips)
  • Triggering flushes by count or timeout (configurable latency vs throughput)
  • Providing a generic API over any item type T

Usage

Create a batcher with a BeginTx function and a FlushFunc:

wb, err := writebatcher.New[MyItem](ctx, writebatcher.Config[MyItem]{
    BeginTx: func(ctx context.Context) (*sql.Tx, error) {
        return db.BeginTx(ctx, nil)
    },
    Flush: func(ctx context.Context, tx *sql.Tx, batch []MyItem) error {
        for _, item := range batch {
            if _, err := tx.ExecContext(ctx, "INSERT ...", item.Val); err != nil {
                return err
            }
        }
        return nil
    },
    OnError:      func(err error, batch []MyItem) { log.Println(err) },
    MaxBatchSize: 50,
})

Submit items from any goroutine:

if err := wb.Submit(item); err != nil {
    // ErrFull (channel at capacity) or ErrClosed (batcher shut down)
}

Close flushes remaining items and releases resources:

wb.Close()

Flush Triggers

A flush occurs when any of these conditions is met:

  • The batch reaches MaxBatchSize items (default 50)
  • FlushInterval elapses since the first item entered the current batch (default 200ms)
  • The batch's cumulative size (via SizeFunc) reaches MaxBatchBytes (when SizeFunc and MaxBatchBytes > 0)
  • Close() is called

Transaction Lifecycle

The batcher calls BeginTx, passes the *sql.Tx to FlushFunc, then calls Commit on success or Rollback on failure. FlushFunc should only execute SQL statements -- it must not call Commit or Rollback itself.

Thread Safety

Submit is safe for concurrent use by multiple goroutines. Close is safe to call multiple times. All other methods are internal to the worker goroutine.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed = errors.New("writebatcher: closed")
	ErrFull   = errors.New("writebatcher: channel full")
)

Sentinel errors returned by Submit.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	BeginTx       func(ctx context.Context) (*sql.Tx, error) // how to start a tx
	Flush         FlushFunc[T]                               // business logic
	OnError       OnErrorFunc[T]                             // called on flush failure (nil = log only)
	OnSuccess     OnSuccessFunc[T]                           // called after successful commit
	MaxBatchSize  int                                        // flush at this count (default 50)
	FlushInterval time.Duration                              // flush after this duration (default 200ms)
	ChannelSize   int                                        // buffered channel capacity (default 1024)
	SizeFunc      func(T) int64                              // returns byte cost of an item (nil = size tracking disabled)
	MaxBatchBytes int64                                      // flush when cumulative batch bytes >= this (0 = no byte limit)
}

Config holds all parameters for a WriteBatcher. BeginTx and Flush are required; other fields have defaults (MaxBatchSize 50, FlushInterval 200ms, ChannelSize 1024).

type FlushFunc

type FlushFunc[T any] func(ctx context.Context, tx *sql.Tx, batch []T) error

FlushFunc executes the batch within the provided transaction. The batcher calls BeginTx before Flush and Commit or Rollback after; FlushFunc must only run SQL statements and must not call Commit or Rollback. The batch slice is valid only for the duration of the call; do not retain it.

type OnErrorFunc

type OnErrorFunc[T any] func(err error, batch []T)

OnErrorFunc is called when a flush fails (after Rollback). The batch is a copy and is safe to retain or use for retry logic. If OnError is nil, the batcher logs the error with slog.Error instead.

type OnSuccessFunc

type OnSuccessFunc[T any] func(batch []T)

OnSuccessFunc is called after a successful flush and commit. The batch is passed as a slice; the caller must not retain it as it may be reused.

type Stats

type Stats struct {
	ChannelSize   int
	MaxBatchSize  int
	FlushInterval time.Duration
	IsClosed      bool
	TotalFlushed  int64
	TotalErrors   int64
}

Stats holds statistics about the WriteBatcher.

type WriteBatcher

type WriteBatcher[T any] struct {
	// contains filtered or unexported fields
}

WriteBatcher collects items of type T and flushes them in batched transactions through a single background worker. A WriteBatcher must be created using New and should not be copied after first use. The zero value is not usable.

func New

func New[T any](ctx context.Context, cfg Config[T]) (*WriteBatcher[T], error)

New creates a WriteBatcher for type T and starts its background worker.

BeginTx must start a new transaction; it is called by the worker for each flush. Flush is called with that transaction and the current batch; it must execute the SQL (e.g. INSERT/UPSERT) and return. OnError is optional; if nil, errors are logged with slog. MaxBatchSize, FlushInterval, and ChannelSize use defaults when zero or negative (50, 200ms, 1024).

SizeFunc and MaxBatchBytes are optional. If both are set (MaxBatchBytes > 0 and SizeFunc non-nil), the batcher tracks cumulative batch size and flushes when the total reaches MaxBatchBytes. If MaxBatchBytes is 0, size tracking runs but never triggers a flush.

The worker runs until the context is cancelled or the input channel is closed. The caller must call Close to shut down the batcher and release resources; closing the context without calling Close leaves the channel open.

New returns an error if BeginTx or Flush is nil.

func (*WriteBatcher[T]) Close

func (wb *WriteBatcher[T]) Close() error

Close signals shutdown: it closes the input channel, waits for the worker to drain and flush any remaining items, then cancels the context and returns. After Close returns, all subsequent Submit calls return ErrClosed.

Close is safe to call multiple times; after the first call it returns nil immediately without blocking.

func (*WriteBatcher[T]) GetStats

func (wb *WriteBatcher[T]) GetStats() Stats

GetStats returns the current statistics of the WriteBatcher.

func (*WriteBatcher[T]) PendingCount

func (wb *WriteBatcher[T]) PendingCount() int64

PendingCount returns the number of items currently enqueued or in the current batch and not yet flushed. It is intended for completion checks (e.g. consider processing done only when PendingCount is zero in addition to worker in-flight).

func (*WriteBatcher[T]) Submit

func (wb *WriteBatcher[T]) Submit(item T) error

Submit enqueues an item for inclusion in a future flush. It does not block: the item may be flushed later when the batch reaches MaxBatchSize, when FlushInterval elapses, or when Close is called.

Submit returns nil on success. It returns ErrFull if the internal channel is at capacity (caller may retry or drop). It returns ErrClosed if the batcher has been closed or the context passed to New was cancelled.

Submit is safe to call concurrently from multiple goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL