queue

package
v0.2.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DeadLetterQueue

type DeadLetterQueue struct {
	// contains filtered or unexported fields
}

DeadLetterQueue provides disk-based resilience for failed database writes. When a batch insert fails, the data is serialized to JSON and written to disk. A background replay worker periodically attempts to re-insert failed batches with exponential backoff, bounded by configurable file count and disk limits.

func NewDLQ

func NewDLQ(dir string, interval time.Duration, replayFn func(data []byte) error) (*DeadLetterQueue, error)

NewDLQ creates a new Dead Letter Queue. maxFiles/maxDiskMB/maxRetries = 0 means unlimited.

func NewDLQWithLimits

func NewDLQWithLimits(dir string, interval time.Duration, replayFn func(data []byte) error,
	maxFiles int, maxDiskMB int64, maxRetries int) (*DeadLetterQueue, error)

NewDLQWithLimits creates a DLQ with explicit bounds.

func (*DeadLetterQueue) DiskBytes

func (d *DeadLetterQueue) DiskBytes() int64

DiskBytes returns the current total bytes of files in the DLQ directory.

func (*DeadLetterQueue) Enqueue

func (d *DeadLetterQueue) Enqueue(batch any) error

Enqueue serializes the given batch to JSON and writes it to disk. Enforces file count and disk size limits (FIFO eviction when exceeded).

Uses os.CreateTemp under the hood so concurrent enqueues never collide on a filename, even when the OS clock's resolution is coarser than goroutine scheduling (Windows, virtualised hosts) or thousands of failures hit the same nanosecond. A nanosecond-prefixed pattern is still passed to CreateTemp so the files sort chronologically for FIFO eviction.

func (*DeadLetterQueue) EvictedBytesCount

func (d *DeadLetterQueue) EvictedBytesCount() int64

EvictedBytesCount reports the byte volume dropped alongside EvictedCount.

func (*DeadLetterQueue) EvictedCount

func (d *DeadLetterQueue) EvictedCount() int64

EvictedCount reports the cumulative number of DLQ files dropped due to MaxFiles/MaxDiskMB caps. Exposed for tests; see otelcontext_dlq_evicted_total.

func (*DeadLetterQueue) SetMaxReplayPerTick

func (d *DeadLetterQueue) SetMaxReplayPerTick(n int)

SetMaxReplayPerTick caps how many files the replay worker will attempt in one tick. n <= 0 disables the cap (unlimited). Safe to call after construction; the next tick observes the new value.

func (*DeadLetterQueue) SetMetrics

func (d *DeadLetterQueue) SetMetrics(onEnqueue, onSuccess, onFailure func(), onDiskBytes func(int64))

SetMetrics wires Prometheus metric callbacks into the DLQ.

func (*DeadLetterQueue) SetTelemetryMetrics

func (d *DeadLetterQueue) SetTelemetryMetrics(m *telemetry.Metrics)

SetTelemetryMetrics wires the Prometheus registry so eviction counts surface in telemetry. Safe to call with a nil *telemetry.Metrics (disables the hook).

func (*DeadLetterQueue) Size

func (d *DeadLetterQueue) Size() int

Size returns the number of files currently in the DLQ directory.

func (*DeadLetterQueue) Stop

func (d *DeadLetterQueue) Stop()

Stop gracefully shuts down the replay worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL