retry

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Overview

Package retry provides persistent retry queue functionality for failed API operations.

Package retry provides persistent retry queue functionality for failed API operations.

Package retry provides persistent retry queue functionality for failed API operations.

Package retry provides persistent retry queue functionality for failed API operations.

The retry package implements a store-and-forward pattern to ensure that scan data is never lost due to temporary network failures or server unavailability.

Key components:

  • RetryQueue: Interface for queue implementations
  • FileRetryQueue: File-based persistent queue (no external dependencies)
  • RetryWorker: Background processor that retries queued items

Example usage:

queue, _ := retry.NewFileRetryQueue(&retry.FileQueueConfig{
    Dir: "/var/lib/openctem/retry-queue",
})

worker := retry.NewRetryWorker(&retry.RetryWorkerConfig{
    Interval: 5 * time.Minute,
}, queue, pusher)

worker.Start(ctx)
defer worker.Stop(ctx)

Package retry provides persistent retry queue functionality for failed API operations.

Index

Constants

View Source
const DefaultBatchSize = 10

DefaultBatchSize is the default number of items to process per batch.

View Source
const DefaultMaxAttempts = 10

DefaultMaxAttempts is the default maximum number of retry attempts.

View Source
const DefaultMaxQueueSize = 1000

DefaultMaxQueueSize is the default maximum number of items in the queue.

View Source
const DefaultRetryInterval = 5 * time.Minute

DefaultRetryInterval is the default interval between retry checks.

View Source
const DefaultTTL = 7 * 24 * time.Hour

DefaultTTL is the default time-to-live for queue items (7 days).

Variables

View Source
var (
	// ErrQueueFull is returned when the queue has reached its maximum capacity.
	ErrQueueFull = errors.New("retry queue is full")

	// ErrQueueClosed is returned when operations are attempted on a closed queue.
	ErrQueueClosed = errors.New("retry queue is closed")

	// ErrItemNotFound is returned when the requested item doesn't exist.
	ErrItemNotFound = errors.New("queue item not found")

	// ErrDuplicateItem is returned when attempting to enqueue a duplicate item.
	ErrDuplicateItem = errors.New("duplicate item already in queue")

	// ErrInvalidItem is returned when the queue item is invalid.
	ErrInvalidItem = errors.New("invalid queue item")
)

Common errors for retry queue operations.

Functions

func CalculateNextRetry

func CalculateNextRetry(attempts int, baseInterval time.Duration) time.Time

CalculateNextRetry calculates the next retry time based on the number of attempts. This is a convenience function that uses the default exponential backoff.

Backoff schedule with default 5-minute base:

attempt 1: 5 minutes
attempt 2: 10 minutes
attempt 3: 20 minutes
attempt 4: 40 minutes
attempt 5: 80 minutes (~1.3 hours)
attempt 6: 160 minutes (~2.6 hours)
attempt 7: 320 minutes (~5.3 hours)
attempt 8: 640 minutes (~10.6 hours)
attempt 9: 1280 minutes (~21 hours)
attempt 10: 2560 minutes (~42 hours) - capped at maxInterval

func IsReadyForRetry

func IsReadyForRetry(lastAttempt time.Time, attempts int, cfg *BackoffConfig) bool

IsReadyForRetry checks if enough time has passed since the last attempt.

Types

type BackoffConfig

type BackoffConfig struct {
	// Strategy is the backoff strategy to use.
	// Default is BackoffExponential.
	Strategy BackoffStrategy

	// BaseInterval is the base interval for backoff calculation.
	// Default is DefaultRetryInterval (5 minutes).
	BaseInterval time.Duration

	// MaxInterval is the maximum interval between retries.
	// Default is 48 hours.
	MaxInterval time.Duration

	// Jitter adds randomness to prevent thundering herd.
	// Value between 0.0 (no jitter) and 1.0 (full jitter).
	// Default is 0.1 (10% jitter).
	Jitter float64
}

BackoffConfig configures the backoff behavior.

func DefaultBackoffConfig

func DefaultBackoffConfig() *BackoffConfig

DefaultBackoffConfig returns a BackoffConfig with default values.

func (*BackoffConfig) NextRetry

func (c *BackoffConfig) NextRetry(attempts int) time.Time

NextRetry calculates the next retry time based on the configuration.

func (*BackoffConfig) NextRetryFrom

func (c *BackoffConfig) NextRetryFrom(from time.Time, attempts int) time.Time

NextRetryFrom calculates the next retry time from a specific base time.

func (*BackoffConfig) RetrySchedule

func (c *BackoffConfig) RetrySchedule(maxAttempts int) []time.Duration

RetrySchedule returns a slice of retry times for a given number of attempts. Useful for displaying or logging the expected retry schedule.

func (*BackoffConfig) TotalBackoffTime

func (c *BackoffConfig) TotalBackoffTime(maxAttempts int) time.Duration

TotalBackoffTime calculates the total time for all retry attempts. Useful for estimating how long before an item is marked as permanently failed.

type BackoffStrategy

type BackoffStrategy int

BackoffStrategy defines how to calculate the next retry time.

const (
	// BackoffExponential uses exponential backoff: base * 2^attempt
	BackoffExponential BackoffStrategy = iota

	// BackoffLinear uses linear backoff: base * attempt
	BackoffLinear

	// BackoffConstant uses constant backoff: base (no increase)
	BackoffConstant
)

type FileQueueConfig

type FileQueueConfig struct {
	// Dir is the directory to store queue files.
	// Default: ~/.openctem/retry-queue
	Dir string

	// MaxSize is the maximum number of items in the queue.
	// Default: 1000
	MaxSize int

	// Deduplication enables fingerprint-based deduplication.
	// Default: true
	Deduplication bool

	// Backoff configures the retry backoff behavior.
	// Default: exponential backoff with 5-minute base
	Backoff *BackoffConfig

	// Verbose enables verbose logging.
	Verbose bool
}

FileQueueConfig configures the file-based retry queue.

type FileRetryQueue

type FileRetryQueue struct {
	// contains filtered or unexported fields
}

FileRetryQueue implements RetryQueue using JSON files. Each queue item is stored as a separate JSON file in a directory. This provides simplicity, durability, and no external dependencies.

File naming convention: {timestamp}_{id}.json This allows natural sorting by creation time when listing files.

func NewFileRetryQueue

func NewFileRetryQueue(cfg *FileQueueConfig) (*FileRetryQueue, error)

NewFileRetryQueue creates a new file-based retry queue.

func (*FileRetryQueue) Cleanup

func (fq *FileRetryQueue) Cleanup(ctx context.Context, ttl time.Duration) (int, error)

Cleanup removes expired items and permanently failed items.

func (*FileRetryQueue) Close

func (fq *FileRetryQueue) Close() error

Close closes the queue and releases resources.

func (*FileRetryQueue) Delete

func (fq *FileRetryQueue) Delete(ctx context.Context, id string) error

Delete removes an item from the queue.

func (*FileRetryQueue) Dequeue

func (fq *FileRetryQueue) Dequeue(ctx context.Context) (*QueueItem, error)

Dequeue removes and returns the next item ready for retry.

func (*FileRetryQueue) Enqueue

func (fq *FileRetryQueue) Enqueue(ctx context.Context, item *QueueItem) (string, error)

Enqueue adds an item to the queue.

func (*FileRetryQueue) Get

func (fq *FileRetryQueue) Get(ctx context.Context, id string) (*QueueItem, error)

Get retrieves an item by ID without removing it.

func (*FileRetryQueue) List

func (fq *FileRetryQueue) List(ctx context.Context, filter ListFilter) ([]*QueueItem, error)

List returns items matching the given filter.

func (*FileRetryQueue) MarkFailed

func (fq *FileRetryQueue) MarkFailed(ctx context.Context, id string, lastError string) error

MarkFailed marks an item as permanently failed.

func (*FileRetryQueue) Peek

func (fq *FileRetryQueue) Peek(ctx context.Context, limit int) ([]*QueueItem, error)

Peek returns items ready for retry without removing them.

func (*FileRetryQueue) Requeue

func (fq *FileRetryQueue) Requeue(ctx context.Context, id string, nextRetry time.Time) error

Requeue moves an item back to pending status for retry.

func (*FileRetryQueue) Size

func (fq *FileRetryQueue) Size(ctx context.Context) (int, error)

Size returns the total number of items in the queue.

func (*FileRetryQueue) Stats

func (fq *FileRetryQueue) Stats(ctx context.Context) (*QueueStats, error)

Stats returns detailed statistics about the queue.

func (*FileRetryQueue) Update

func (fq *FileRetryQueue) Update(ctx context.Context, item *QueueItem) error

Update updates an existing item in the queue.

type FingerprintCheckResult

type FingerprintCheckResult struct {
	Existing []string // Fingerprints that already exist on the server
	Missing  []string // Fingerprints that don't exist on the server
}

FingerprintCheckResult contains the result of a fingerprint check.

type FingerprintChecker

type FingerprintChecker interface {
	// CheckFingerprints checks which fingerprints already exist on the server.
	// Returns two slices: existing fingerprints and missing fingerprints.
	CheckFingerprints(ctx context.Context, fingerprints []string) (*FingerprintCheckResult, error)
}

FingerprintChecker is the interface for checking if fingerprints already exist on the server. This is used by the retry worker to avoid re-uploading data that already exists.

type ItemStatus

type ItemStatus string

ItemStatus represents the status of a queue item.

const (
	// ItemStatusPending indicates the item is waiting for retry.
	ItemStatusPending ItemStatus = "pending"

	// ItemStatusProcessing indicates the item is currently being processed.
	ItemStatusProcessing ItemStatus = "processing"

	// ItemStatusFailed indicates the item has exhausted all retry attempts.
	ItemStatusFailed ItemStatus = "failed"
)

type ItemType

type ItemType string

ItemType defines the type of queued item.

const (
	// ItemTypeFindings represents scan findings to be pushed.
	ItemTypeFindings ItemType = "findings"

	// ItemTypeAssets represents assets to be pushed.
	ItemTypeAssets ItemType = "assets"

	// ItemTypeHeartbeat represents a heartbeat message.
	ItemTypeHeartbeat ItemType = "heartbeat"
)

type ListFilter

type ListFilter struct {
	// Status filters by item status. Empty means all statuses.
	Status ItemStatus

	// Type filters by item type. Empty means all types.
	Type ItemType

	// Limit is the maximum number of items to return. 0 means no limit.
	Limit int

	// Offset is the number of items to skip. Used for pagination.
	Offset int

	// ReadyOnly if true, only returns items ready for retry (NextRetry <= now).
	ReadyOnly bool

	// OrderBy specifies the field to order by. Default is "created_at".
	OrderBy string

	// OrderDesc if true, orders in descending order.
	OrderDesc bool
}

ListFilter defines options for filtering queue items.

type Pusher

type Pusher interface {
	// PushReport pushes a report to the server.
	// Returns nil on success.
	PushReport(ctx context.Context, report any) error
}

Pusher is the interface that wraps the basic Push method. This is used by RetryWorker to push items to the server.

type QueueConfig

type QueueConfig struct {
	// MaxSize is the maximum number of items allowed in the queue.
	// Default is DefaultMaxQueueSize (1000).
	MaxSize int

	// Deduplication enables fingerprint-based deduplication.
	// Default is true.
	Deduplication bool

	// Verbose enables verbose logging.
	Verbose bool
}

QueueConfig contains common configuration for queue implementations.

func DefaultQueueConfig

func DefaultQueueConfig() *QueueConfig

DefaultQueueConfig returns a QueueConfig with default values.

func (*QueueConfig) Validate

func (c *QueueConfig) Validate() error

Validate validates the queue configuration.

type QueueItem

type QueueItem struct {
	// Identification
	ID          string     `json:"id"`          // Unique identifier (UUID)
	Type        ItemType   `json:"type"`        // Type of item (findings, assets, heartbeat)
	Fingerprint string     `json:"fingerprint"` // Content fingerprint for deduplication
	Status      ItemStatus `json:"status"`      // Current status

	// Payload
	Report *ctis.Report `json:"report"` // The CTIS report to push

	// Retry tracking
	Attempts    int       `json:"attempts"`     // Number of retry attempts made
	MaxAttempts int       `json:"max_attempts"` // Maximum retry attempts allowed
	LastError   string    `json:"last_error"`   // Last error message
	LastAttempt time.Time `json:"last_attempt"` // Timestamp of last attempt
	NextRetry   time.Time `json:"next_retry"`   // Scheduled next retry time

	// Metadata
	CreatedAt   time.Time `json:"created_at"`   // When item was first queued
	UpdatedAt   time.Time `json:"updated_at"`   // Last update timestamp
	AgentID     string    `json:"agent_id"`     // Source agent ID
	ScannerName string    `json:"scanner_name"` // Source scanner name
	TargetPath  string    `json:"target_path"`  // Scan target path
}

QueueItem represents an item in the retry queue.

func (*QueueItem) HasExhaustedRetries

func (item *QueueItem) HasExhaustedRetries() bool

HasExhaustedRetries checks if the item has used all retry attempts.

func (*QueueItem) IsExpired

func (item *QueueItem) IsExpired(ttl time.Duration) bool

IsExpired checks if the item has exceeded its TTL.

func (*QueueItem) IsReadyForRetry

func (item *QueueItem) IsReadyForRetry() bool

IsReadyForRetry checks if the item is ready to be retried.

type QueueStats

type QueueStats struct {
	TotalItems      int       `json:"total_items"`       // Total items in queue
	PendingItems    int       `json:"pending_items"`     // Items waiting for retry
	ProcessingItems int       `json:"processing_items"`  // Items currently being processed
	FailedItems     int       `json:"failed_items"`      // Items that exhausted retries
	OldestItem      time.Time `json:"oldest_item"`       // Creation time of oldest item
	NewestItem      time.Time `json:"newest_item"`       // Creation time of newest item
	LastRetry       time.Time `json:"last_retry"`        // Last retry attempt time
	TotalRetries    int64     `json:"total_retries"`     // Total retry attempts made
	SuccessfulPush  int64     `json:"successful_pushes"` // Successful pushes from retry
}

QueueStats provides statistics about the retry queue.

type ReportPusher

type ReportPusher interface {
	// PushReport pushes a CTIS report to the server.
	PushReport(ctx context.Context, report *ctis.Report) error
}

ReportPusher is the interface for pushing reports to the server. This is typically implemented by the Client.

type RetryCallback

type RetryCallback func(result *RetryResult)

RetryCallback is a function called after each retry attempt.

type RetryQueue

type RetryQueue interface {
	// Enqueue adds an item to the queue.
	// Returns the item ID on success.
	// Returns ErrQueueFull if the queue has reached its maximum capacity.
	// Returns ErrDuplicateItem if an item with the same fingerprint exists.
	Enqueue(ctx context.Context, item *QueueItem) (string, error)

	// Dequeue removes and returns the next item ready for retry.
	// Returns nil, nil if no items are ready for retry.
	// The item's status should be set to ItemStatusProcessing.
	Dequeue(ctx context.Context) (*QueueItem, error)

	// Peek returns items ready for retry without removing them.
	// Items are returned in order of priority (oldest NextRetry first).
	Peek(ctx context.Context, limit int) ([]*QueueItem, error)

	// Get retrieves an item by ID without removing it.
	// Returns ErrItemNotFound if the item doesn't exist.
	Get(ctx context.Context, id string) (*QueueItem, error)

	// Update updates an existing item in the queue.
	// Typically called after a retry attempt to update attempt count, error, etc.
	// Returns ErrItemNotFound if the item doesn't exist.
	Update(ctx context.Context, item *QueueItem) error

	// Delete removes an item from the queue.
	// Typically called after successful push or when item should be discarded.
	// Returns ErrItemNotFound if the item doesn't exist.
	Delete(ctx context.Context, id string) error

	// MarkFailed marks an item as permanently failed.
	// Called when an item has exhausted all retry attempts.
	MarkFailed(ctx context.Context, id string, lastError string) error

	// Requeue moves an item back to pending status for retry.
	// Called when a processing item needs to be retried.
	Requeue(ctx context.Context, id string, nextRetry time.Time) error

	// Size returns the total number of items in the queue.
	Size(ctx context.Context) (int, error)

	// Stats returns detailed statistics about the queue.
	Stats(ctx context.Context) (*QueueStats, error)

	// Cleanup removes expired items (older than TTL) and permanently failed items.
	// Returns the number of items removed.
	Cleanup(ctx context.Context, ttl time.Duration) (int, error)

	// List returns items matching the given filter.
	List(ctx context.Context, filter ListFilter) ([]*QueueItem, error)

	// Close closes the queue and releases any resources.
	// After Close is called, all other methods should return ErrQueueClosed.
	Close() error
}

RetryQueue defines the interface for retry queue implementations. Implementations must be safe for concurrent use.

type RetryResult

type RetryResult struct {
	ItemID    string        `json:"item_id"`
	Success   bool          `json:"success"`
	Error     string        `json:"error,omitempty"`
	Duration  time.Duration `json:"duration"`
	Attempt   int           `json:"attempt"`
	Timestamp time.Time     `json:"timestamp"`
}

RetryResult represents the result of a retry operation.

type RetryWorker

type RetryWorker struct {
	// contains filtered or unexported fields
}

RetryWorker processes the retry queue in the background. It periodically checks the queue for items ready to retry and attempts to push them to the server.

func NewRetryWorker

func NewRetryWorker(cfg *RetryWorkerConfig, queue RetryQueue, pusher ReportPusher) *RetryWorker

NewRetryWorker creates a new retry worker.

func (*RetryWorker) IsRunning

func (w *RetryWorker) IsRunning() bool

IsRunning returns true if the worker is currently running.

func (*RetryWorker) OnExhaust

func (w *RetryWorker) OnExhaust(fn func(item *QueueItem))

OnExhaust sets a callback to be called when an item exhausts all retries.

func (*RetryWorker) OnFail

func (w *RetryWorker) OnFail(fn func(item *QueueItem, result *RetryResult))

OnFail sets a callback to be called after each failed retry.

func (*RetryWorker) OnSuccess

func (w *RetryWorker) OnSuccess(fn func(item *QueueItem, result *RetryResult))

OnSuccess sets a callback to be called after each successful retry.

func (*RetryWorker) ProcessNow

func (w *RetryWorker) ProcessNow(ctx context.Context) error

ProcessNow immediately processes the queue (for testing or manual triggers). This is a synchronous operation.

func (*RetryWorker) QueueStats

func (w *RetryWorker) QueueStats(ctx context.Context) (*QueueStats, error)

QueueStats returns the current queue statistics.

func (*RetryWorker) Start

func (w *RetryWorker) Start(ctx context.Context) error

Start starts the background retry worker. It returns immediately and processes the queue in a goroutine.

func (*RetryWorker) Stats

func (w *RetryWorker) Stats() WorkerStats

Stats returns the current worker statistics.

func (*RetryWorker) Stop

func (w *RetryWorker) Stop(ctx context.Context) error

Stop stops the background retry worker gracefully. It waits for the current batch to complete.

func (*RetryWorker) TriggerCleanup

func (w *RetryWorker) TriggerCleanup(ctx context.Context) (int, error)

TriggerCleanup manually triggers a cleanup of expired items.

type RetryWorkerConfig

type RetryWorkerConfig struct {
	// Interval is how often to check the queue for items to retry.
	// Default: 5 minutes
	Interval time.Duration `yaml:"interval" json:"interval"`

	// BatchSize is the maximum number of items to process per check.
	// Default: 10
	BatchSize int `yaml:"batch_size" json:"batch_size"`

	// MaxAttempts is the maximum number of retry attempts per item.
	// Default: 10
	MaxAttempts int `yaml:"max_attempts" json:"max_attempts"`

	// TTL is how long to keep items in the queue before expiring.
	// Default: 7 days
	TTL time.Duration `yaml:"ttl" json:"ttl"`

	// Backoff configures the retry backoff behavior.
	// Default: exponential backoff with 5-minute base
	Backoff *BackoffConfig `yaml:"backoff" json:"backoff"`

	// Verbose enables verbose logging.
	Verbose bool `yaml:"verbose" json:"verbose"`
}

RetryWorkerConfig configures the retry worker.

func DefaultRetryWorkerConfig

func DefaultRetryWorkerConfig() *RetryWorkerConfig

DefaultRetryWorkerConfig returns a configuration with default values.

type SmartPusher

type SmartPusher interface {
	ReportPusher
	FingerprintChecker
}

SmartPusher combines ReportPusher with FingerprintChecker for intelligent retry. If the pusher also implements FingerprintChecker, the retry worker will check fingerprints before uploading to avoid duplicate uploads.

type WorkerStats

type WorkerStats struct {
	// Lifetime statistics
	TotalAttempts   int64         `json:"total_attempts"`
	SuccessfulPush  int64         `json:"successful_pushes"`
	FailedAttempts  int64         `json:"failed_attempts"`
	ExhaustedItems  int64         `json:"exhausted_items"`
	TotalDuration   time.Duration `json:"total_duration"`
	LastProcessedAt time.Time     `json:"last_processed_at"`

	// Current state
	IsRunning   bool      `json:"is_running"`
	StartedAt   time.Time `json:"started_at"`
	LastCheckAt time.Time `json:"last_check_at"`
}

WorkerStats contains statistics about the retry worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL