backfill

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 4 Imported by: 0

README

internal/backfill

Logic overview

The backfill package provides queue primitives and enqueue policy for missed scrape windows.

  • Dispatcher coalesces time windows, deduplicates jobs, and enforces per-org enqueue caps.
  • InMemoryQueue is a local/test queue implementation with max-age dropping during consumption.
  • Message age checks prevent stale retries from running indefinitely.

API reference

Types
  • QueuePublisher: queue publish interface used by Dispatcher.
  • Deduper: lock acquisition interface used for dedup suppression.
  • Config: dispatcher settings (CoalesceWindow, DedupTTL, MaxEnqueuesPerOrgPerMinute).
  • Message: queue payload schema for one backfill job.
  • MessageInput: enqueue request payload for missed data windows.
  • EnqueueResult: enqueue decision outcome (Published, DedupSuppressed, DroppedByRateLimit).
  • Dispatcher: policy engine for backfill enqueue operations.
  • InMemoryQueue: buffered channel queue for local development and tests.
Functions
  • NewDispatcher(config Config, queue QueuePublisher, deduper Deduper) *Dispatcher: constructs dispatcher.
  • ShouldDropMessageByAge(msg Message, now time.Time, maxAge time.Duration) bool: returns true when a message has exceeded max age.
  • NewInMemoryQueue(buffer int) *InMemoryQueue: constructs in-memory queue.
Methods
  • (*Dispatcher) EnqueueMissing(input MessageInput) EnqueueResult: coalesces/dedups/rate-limits and publishes backfill jobs.
  • (*InMemoryQueue) Publish(msg Message) error: enqueues one message.
  • (*InMemoryQueue) Consume(ctx context.Context, handler func(Message) error, maxMessageAge time.Duration, nowFn func() time.Time): consumes until context cancellation and drops expired messages.
  • (*InMemoryQueue) Depth() int: returns queued message count.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ShouldDropMessageByAge

func ShouldDropMessageByAge(msg Message, now time.Time, maxAge time.Duration) bool

ShouldDropMessageByAge returns true when a message exceeds max age.

Types

type Config

type Config struct {
	CoalesceWindow             time.Duration
	DedupTTL                   time.Duration
	MaxEnqueuesPerOrgPerMinute int
}

Config controls dispatcher behavior.

type Deduper

type Deduper interface {
	Acquire(key string, ttl time.Duration, now time.Time) bool
}

Deduper acquires dedup locks for messages.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher coalesces, deduplicates, and rate-limits backfill enqueueing.

func NewDispatcher

func NewDispatcher(config Config, queue QueuePublisher, deduper Deduper) *Dispatcher

NewDispatcher creates a dispatcher.

func (*Dispatcher) EnqueueMissing

func (d *Dispatcher) EnqueueMissing(input MessageInput) EnqueueResult

EnqueueMissing enqueues a missed-window message if not deduped and not rate-limited.

type EnqueueResult

type EnqueueResult struct {
	Published          bool
	DedupSuppressed    bool
	DroppedByRateLimit bool
}

EnqueueResult contains enqueue outcomes for observability.

type InMemoryQueue

type InMemoryQueue struct {
	// contains filtered or unexported fields
}

InMemoryQueue is an in-process queue for local development and tests.

func NewInMemoryQueue

func NewInMemoryQueue(buffer int) *InMemoryQueue

NewInMemoryQueue creates an in-memory queue.

func (*InMemoryQueue) Consume

func (q *InMemoryQueue) Consume(
	ctx context.Context,
	handler func(Message) error,
	maxMessageAge time.Duration,
	nowFn func() time.Time,
)

Consume consumes messages until context cancellation.

func (*InMemoryQueue) Depth

func (q *InMemoryQueue) Depth() int

Depth returns the number of queued messages.

func (*InMemoryQueue) Publish

func (q *InMemoryQueue) Publish(msg Message) error

Publish enqueues a message.

type Message

type Message struct {
	JobID       string    `json:"job_id"`
	DedupKey    string    `json:"dedup_key"`
	Org         string    `json:"org"`
	Repo        string    `json:"repo"`
	WindowStart time.Time `json:"window_start"`
	WindowEnd   time.Time `json:"window_end"`
	Reason      string    `json:"reason"`
	Attempt     int       `json:"attempt"`
	MaxAttempts int       `json:"max_attempts"`
	CreatedAt   time.Time `json:"created_at"`
}

Message is a backfill queue payload.

type MessageInput

type MessageInput struct {
	Org         string
	Repo        string
	WindowStart time.Time
	WindowEnd   time.Time
	Reason      string
	Now         time.Time
}

MessageInput is the enqueue input for a missed window.

type QueuePublisher

type QueuePublisher interface {
	Publish(msg Message) error
}

QueuePublisher publishes backfill jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL