Documentation
¶
Index ¶
- type DeadLetterQueue
- func (d *DeadLetterQueue) DiskBytes() int64
- func (d *DeadLetterQueue) Enqueue(batch any) error
- func (d *DeadLetterQueue) EvictedBytesCount() int64
- func (d *DeadLetterQueue) EvictedCount() int64
- func (d *DeadLetterQueue) SetMaxReplayPerTick(n int)
- func (d *DeadLetterQueue) SetMetrics(onEnqueue, onSuccess, onFailure func(), onDiskBytes func(int64))
- func (d *DeadLetterQueue) SetTelemetryMetrics(m *telemetry.Metrics)
- func (d *DeadLetterQueue) Size() int
- func (d *DeadLetterQueue) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DeadLetterQueue ¶
type DeadLetterQueue struct {
// contains filtered or unexported fields
}
DeadLetterQueue provides disk-based resilience for failed database writes. When a batch insert fails, the data is serialized to JSON and written to disk. A background replay worker periodically attempts to re-insert failed batches with exponential backoff, bounded by configurable file count and disk limits.
func NewDLQ ¶
func NewDLQ(dir string, interval time.Duration, replayFn func(data []byte) error) (*DeadLetterQueue, error)
NewDLQ creates a new Dead Letter Queue. maxFiles/maxDiskMB/maxRetries = 0 means unlimited.
func NewDLQWithLimits ¶
func NewDLQWithLimits(dir string, interval time.Duration, replayFn func(data []byte) error, maxFiles int, maxDiskMB int64, maxRetries int) (*DeadLetterQueue, error)
NewDLQWithLimits creates a DLQ with explicit bounds.
func (*DeadLetterQueue) DiskBytes ¶
func (d *DeadLetterQueue) DiskBytes() int64
DiskBytes returns the current total bytes of files in the DLQ directory.
func (*DeadLetterQueue) Enqueue ¶
func (d *DeadLetterQueue) Enqueue(batch any) error
Enqueue serializes the given batch to JSON and writes it to disk. Enforces file count and disk size limits (FIFO eviction when exceeded).
Uses os.CreateTemp under the hood so concurrent enqueues never collide on a filename, even when the OS clock's resolution is coarser than goroutine scheduling (Windows, virtualised hosts) or thousands of failures hit the same nanosecond. A nanosecond-prefixed pattern is still passed to CreateTemp so the files sort chronologically for FIFO eviction.
func (*DeadLetterQueue) EvictedBytesCount ¶
func (d *DeadLetterQueue) EvictedBytesCount() int64
EvictedBytesCount reports the byte volume dropped alongside EvictedCount.
func (*DeadLetterQueue) EvictedCount ¶
func (d *DeadLetterQueue) EvictedCount() int64
EvictedCount reports the cumulative number of DLQ files dropped due to MaxFiles/MaxDiskMB caps. Exposed for tests; see otelcontext_dlq_evicted_total.
func (*DeadLetterQueue) SetMaxReplayPerTick ¶
func (d *DeadLetterQueue) SetMaxReplayPerTick(n int)
SetMaxReplayPerTick caps how many files the replay worker will attempt in one tick. n <= 0 disables the cap (unlimited). Safe to call after construction; the next tick observes the new value.
func (*DeadLetterQueue) SetMetrics ¶
func (d *DeadLetterQueue) SetMetrics(onEnqueue, onSuccess, onFailure func(), onDiskBytes func(int64))
SetMetrics wires Prometheus metric callbacks into the DLQ.
func (*DeadLetterQueue) SetTelemetryMetrics ¶
func (d *DeadLetterQueue) SetTelemetryMetrics(m *telemetry.Metrics)
SetTelemetryMetrics wires the Prometheus registry so eviction counts surface in telemetry. Safe to call with a nil *telemetry.Metrics (disables the hook).
func (*DeadLetterQueue) Size ¶
func (d *DeadLetterQueue) Size() int
Size returns the number of files currently in the DLQ directory.
func (*DeadLetterQueue) Stop ¶
func (d *DeadLetterQueue) Stop()
Stop gracefully shuts down the replay worker.