writebehind

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package writebehind is a typed non-blocking persistence queue: the producer (simulation loop, tick goroutine, request handler) Pushes operations into a bounded channel and never blocks on the slow downstream; a dedicated draining goroutine batches them and calls the consumer-supplied Flusher.

Backpressure policies: DropOldest / DropNewest / Block / CoalesceByKey. Defaults to DropOldest, the right call for game shard write-behind (the freshest position wins).

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCapacityRequired = errors.New("writebehind: Capacity must be > 0")
	ErrFlusherRequired  = errors.New("writebehind: Flusher is required")
	ErrCoalesceNeedsKey = errors.New("writebehind: Policy=CoalesceByKey requires KeyFunc")
	ErrQueueClosed      = errors.New("writebehind: queue closed")
	ErrPushBlockTimeout = errors.New("writebehind: Push timed out waiting for capacity")
)

Errors surfaced by the queue.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	Capacity      int                             // required; > 0
	BatchSize     int                             // max items per Flush; default 100
	FlushInterval time.Duration                   // periodic flush even when batch < BatchSize; default 1s
	Policy        Policy                          // default DropOldest
	KeyFunc       func(T) string                  // required when Policy == CoalesceByKey
	Flusher       Flusher[T]                      // required
	RetryBackoff  func(attempt int) time.Duration // default 100ms * 2^(attempt-1), capped 5s
	MaxRetries    int                             // default 3; -1 = infinite
	OnDrop        func(item T, reason string)
}

Config configures a Queue.

type Flusher

type Flusher[T any] interface {
	Flush(ctx context.Context, batch []T) error
}

Flusher writes a batch of operations to the downstream store.

type FlusherFunc

type FlusherFunc[T any] func(ctx context.Context, batch []T) error

FlusherFunc adapts a function to Flusher.

func (FlusherFunc[T]) Flush

func (f FlusherFunc[T]) Flush(ctx context.Context, batch []T) error

Flush implements Flusher.

type Policy

type Policy uint8

Policy controls behaviour when the queue is full.

const (
	// DropOldest replaces the oldest queued item with the new one.
	DropOldest Policy = iota
	// DropNewest drops the incoming item without queueing it.
	DropNewest
	// Block makes Push wait until queue space is available or ctx
	// expires.
	Block
	// CoalesceByKey overwrites an in-queue item that shares the same
	// key. Requires KeyFunc.
	CoalesceByKey
)

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is a typed write-behind queue. Goroutine-safe.

func New

func New[T any](cfg Config[T]) (*Queue[T], error)

New constructs a Queue.

func (*Queue[T]) Depth

func (q *Queue[T]) Depth() int

Depth returns the current queue length (atomic snapshot).

func (*Queue[T]) Push

func (q *Queue[T]) Push(ctx context.Context, item T) error

Push enqueues item according to the configured Policy. Returns ErrQueueClosed after Run has returned, ErrPushBlockTimeout when Block-policy Push waits beyond ctx's deadline.

func (*Queue[T]) Run

func (q *Queue[T]) Run(ctx context.Context) error

Run drains the queue until ctx is cancelled, flushing in batches of BatchSize or every FlushInterval (whichever fires first). On return, any remaining items are flushed one last time so shutdown is clean.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL