Documentation
¶
Overview ¶
Package retry provides persistent retry queue functionality for failed API operations.
Package retry provides persistent retry queue functionality for failed API operations.
Package retry provides persistent retry queue functionality for failed API operations.
Package retry provides persistent retry queue functionality for failed API operations.
The retry package implements a store-and-forward pattern to ensure that scan data is never lost due to temporary network failures or server unavailability.
Key components:
- RetryQueue: Interface for queue implementations
- FileRetryQueue: File-based persistent queue (no external dependencies)
- RetryWorker: Background processor that retries queued items
Example usage:
queue, _ := retry.NewFileRetryQueue(&retry.FileQueueConfig{
Dir: "/var/lib/openctem/retry-queue",
})
worker := retry.NewRetryWorker(&retry.RetryWorkerConfig{
Interval: 5 * time.Minute,
}, queue, pusher)
worker.Start(ctx)
defer worker.Stop(ctx)
Package retry provides persistent retry queue functionality for failed API operations.
Index ¶
- Constants
- Variables
- func CalculateNextRetry(attempts int, baseInterval time.Duration) time.Time
- func IsReadyForRetry(lastAttempt time.Time, attempts int, cfg *BackoffConfig) bool
- type BackoffConfig
- type BackoffStrategy
- type FileQueueConfig
- type FileRetryQueue
- func (fq *FileRetryQueue) Cleanup(ctx context.Context, ttl time.Duration) (int, error)
- func (fq *FileRetryQueue) Close() error
- func (fq *FileRetryQueue) Delete(ctx context.Context, id string) error
- func (fq *FileRetryQueue) Dequeue(ctx context.Context) (*QueueItem, error)
- func (fq *FileRetryQueue) Enqueue(ctx context.Context, item *QueueItem) (string, error)
- func (fq *FileRetryQueue) Get(ctx context.Context, id string) (*QueueItem, error)
- func (fq *FileRetryQueue) List(ctx context.Context, filter ListFilter) ([]*QueueItem, error)
- func (fq *FileRetryQueue) MarkFailed(ctx context.Context, id string, lastError string) error
- func (fq *FileRetryQueue) Peek(ctx context.Context, limit int) ([]*QueueItem, error)
- func (fq *FileRetryQueue) Requeue(ctx context.Context, id string, nextRetry time.Time) error
- func (fq *FileRetryQueue) Size(ctx context.Context) (int, error)
- func (fq *FileRetryQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (fq *FileRetryQueue) Update(ctx context.Context, item *QueueItem) error
- type FingerprintCheckResult
- type FingerprintChecker
- type ItemStatus
- type ItemType
- type ListFilter
- type Pusher
- type QueueConfig
- type QueueItem
- type QueueStats
- type ReportPusher
- type RetryCallback
- type RetryQueue
- type RetryResult
- type RetryWorker
- func (w *RetryWorker) IsRunning() bool
- func (w *RetryWorker) OnExhaust(fn func(item *QueueItem))
- func (w *RetryWorker) OnFail(fn func(item *QueueItem, result *RetryResult))
- func (w *RetryWorker) OnSuccess(fn func(item *QueueItem, result *RetryResult))
- func (w *RetryWorker) ProcessNow(ctx context.Context) error
- func (w *RetryWorker) QueueStats(ctx context.Context) (*QueueStats, error)
- func (w *RetryWorker) Start(ctx context.Context) error
- func (w *RetryWorker) Stats() WorkerStats
- func (w *RetryWorker) Stop(ctx context.Context) error
- func (w *RetryWorker) TriggerCleanup(ctx context.Context) (int, error)
- type RetryWorkerConfig
- type SmartPusher
- type WorkerStats
Constants ¶
const DefaultBatchSize = 10
DefaultBatchSize is the default number of items to process per batch.
const DefaultMaxAttempts = 10
DefaultMaxAttempts is the default maximum number of retry attempts.
const DefaultMaxQueueSize = 1000
DefaultMaxQueueSize is the default maximum number of items in the queue.
const DefaultRetryInterval = 5 * time.Minute
DefaultRetryInterval is the default interval between retry checks.
const DefaultTTL = 7 * 24 * time.Hour
DefaultTTL is the default time-to-live for queue items (7 days).
Variables ¶
var ( // ErrQueueFull is returned when the queue has reached its maximum capacity. ErrQueueFull = errors.New("retry queue is full") // ErrQueueClosed is returned when operations are attempted on a closed queue. ErrQueueClosed = errors.New("retry queue is closed") // ErrItemNotFound is returned when the requested item doesn't exist. ErrItemNotFound = errors.New("queue item not found") // ErrDuplicateItem is returned when attempting to enqueue a duplicate item. ErrDuplicateItem = errors.New("duplicate item already in queue") // ErrInvalidItem is returned when the queue item is invalid. ErrInvalidItem = errors.New("invalid queue item") )
Common errors for retry queue operations.
Functions ¶
func CalculateNextRetry ¶
CalculateNextRetry calculates the next retry time based on the number of attempts. This is a convenience function that uses the default exponential backoff.
Backoff schedule with default 5-minute base:
attempt 1: 5 minutes attempt 2: 10 minutes attempt 3: 20 minutes attempt 4: 40 minutes attempt 5: 80 minutes (~1.3 hours) attempt 6: 160 minutes (~2.6 hours) attempt 7: 320 minutes (~5.3 hours) attempt 8: 640 minutes (~10.6 hours) attempt 9: 1280 minutes (~21 hours) attempt 10: 2560 minutes (~42 hours) - capped at maxInterval
func IsReadyForRetry ¶
func IsReadyForRetry(lastAttempt time.Time, attempts int, cfg *BackoffConfig) bool
IsReadyForRetry checks if enough time has passed since the last attempt.
Types ¶
type BackoffConfig ¶
type BackoffConfig struct {
// Strategy is the backoff strategy to use.
// Default is BackoffExponential.
Strategy BackoffStrategy
// BaseInterval is the base interval for backoff calculation.
// Default is DefaultRetryInterval (5 minutes).
BaseInterval time.Duration
// MaxInterval is the maximum interval between retries.
// Default is 48 hours.
MaxInterval time.Duration
// Jitter adds randomness to prevent thundering herd.
// Value between 0.0 (no jitter) and 1.0 (full jitter).
// Default is 0.1 (10% jitter).
Jitter float64
}
BackoffConfig configures the backoff behavior.
func DefaultBackoffConfig ¶
func DefaultBackoffConfig() *BackoffConfig
DefaultBackoffConfig returns a BackoffConfig with default values.
func (*BackoffConfig) NextRetry ¶
func (c *BackoffConfig) NextRetry(attempts int) time.Time
NextRetry calculates the next retry time based on the configuration.
func (*BackoffConfig) NextRetryFrom ¶
NextRetryFrom calculates the next retry time from a specific base time.
func (*BackoffConfig) RetrySchedule ¶
func (c *BackoffConfig) RetrySchedule(maxAttempts int) []time.Duration
RetrySchedule returns a slice of retry times for a given number of attempts. Useful for displaying or logging the expected retry schedule.
func (*BackoffConfig) TotalBackoffTime ¶
func (c *BackoffConfig) TotalBackoffTime(maxAttempts int) time.Duration
TotalBackoffTime calculates the total time for all retry attempts. Useful for estimating how long before an item is marked as permanently failed.
type BackoffStrategy ¶
type BackoffStrategy int
BackoffStrategy defines how to calculate the next retry time.
const ( // BackoffExponential uses exponential backoff: base * 2^attempt BackoffExponential BackoffStrategy = iota // BackoffLinear uses linear backoff: base * attempt BackoffLinear // BackoffConstant uses constant backoff: base (no increase) BackoffConstant )
type FileQueueConfig ¶
type FileQueueConfig struct {
// Dir is the directory to store queue files.
// Default: ~/.openctem/retry-queue
Dir string
// MaxSize is the maximum number of items in the queue.
// Default: 1000
MaxSize int
// Deduplication enables fingerprint-based deduplication.
// Default: true
Deduplication bool
// Backoff configures the retry backoff behavior.
// Default: exponential backoff with 5-minute base
Backoff *BackoffConfig
// Verbose enables verbose logging.
Verbose bool
}
FileQueueConfig configures the file-based retry queue.
type FileRetryQueue ¶
type FileRetryQueue struct {
// contains filtered or unexported fields
}
FileRetryQueue implements RetryQueue using JSON files. Each queue item is stored as a separate JSON file in a directory. This provides simplicity, durability, and no external dependencies.
File naming convention: {timestamp}_{id}.json This allows natural sorting by creation time when listing files.
func NewFileRetryQueue ¶
func NewFileRetryQueue(cfg *FileQueueConfig) (*FileRetryQueue, error)
NewFileRetryQueue creates a new file-based retry queue.
func (*FileRetryQueue) Close ¶
func (fq *FileRetryQueue) Close() error
Close closes the queue and releases resources.
func (*FileRetryQueue) Delete ¶
func (fq *FileRetryQueue) Delete(ctx context.Context, id string) error
Delete removes an item from the queue.
func (*FileRetryQueue) Dequeue ¶
func (fq *FileRetryQueue) Dequeue(ctx context.Context) (*QueueItem, error)
Dequeue removes and returns the next item ready for retry.
func (*FileRetryQueue) List ¶
func (fq *FileRetryQueue) List(ctx context.Context, filter ListFilter) ([]*QueueItem, error)
List returns items matching the given filter.
func (*FileRetryQueue) MarkFailed ¶
MarkFailed marks an item as permanently failed.
func (*FileRetryQueue) Size ¶
func (fq *FileRetryQueue) Size(ctx context.Context) (int, error)
Size returns the total number of items in the queue.
func (*FileRetryQueue) Stats ¶
func (fq *FileRetryQueue) Stats(ctx context.Context) (*QueueStats, error)
Stats returns detailed statistics about the queue.
type FingerprintCheckResult ¶
type FingerprintCheckResult struct {
Existing []string // Fingerprints that already exist on the server
Missing []string // Fingerprints that don't exist on the server
}
FingerprintCheckResult contains the result of a fingerprint check.
type FingerprintChecker ¶
type FingerprintChecker interface {
// CheckFingerprints checks which fingerprints already exist on the server.
// Returns two slices: existing fingerprints and missing fingerprints.
CheckFingerprints(ctx context.Context, fingerprints []string) (*FingerprintCheckResult, error)
}
FingerprintChecker is the interface for checking if fingerprints already exist on the server. This is used by the retry worker to avoid re-uploading data that already exists.
type ItemStatus ¶
type ItemStatus string
ItemStatus represents the status of a queue item.
const ( // ItemStatusPending indicates the item is waiting for retry. ItemStatusPending ItemStatus = "pending" // ItemStatusProcessing indicates the item is currently being processed. ItemStatusProcessing ItemStatus = "processing" // ItemStatusFailed indicates the item has exhausted all retry attempts. ItemStatusFailed ItemStatus = "failed" )
type ListFilter ¶
type ListFilter struct {
// Status filters by item status. Empty means all statuses.
Status ItemStatus
// Type filters by item type. Empty means all types.
Type ItemType
// Limit is the maximum number of items to return. 0 means no limit.
Limit int
// Offset is the number of items to skip. Used for pagination.
Offset int
// ReadyOnly if true, only returns items ready for retry (NextRetry <= now).
ReadyOnly bool
// OrderBy specifies the field to order by. Default is "created_at".
OrderBy string
// OrderDesc if true, orders in descending order.
OrderDesc bool
}
ListFilter defines options for filtering queue items.
type Pusher ¶
type Pusher interface {
// PushReport pushes a report to the server.
// Returns nil on success.
PushReport(ctx context.Context, report any) error
}
Pusher is the interface that wraps the basic Push method. This is used by RetryWorker to push items to the server.
type QueueConfig ¶
type QueueConfig struct {
// MaxSize is the maximum number of items allowed in the queue.
// Default is DefaultMaxQueueSize (1000).
MaxSize int
// Deduplication enables fingerprint-based deduplication.
// Default is true.
Deduplication bool
// Verbose enables verbose logging.
Verbose bool
}
QueueConfig contains common configuration for queue implementations.
func DefaultQueueConfig ¶
func DefaultQueueConfig() *QueueConfig
DefaultQueueConfig returns a QueueConfig with default values.
func (*QueueConfig) Validate ¶
func (c *QueueConfig) Validate() error
Validate validates the queue configuration.
type QueueItem ¶
type QueueItem struct {
// Identification
ID string `json:"id"` // Unique identifier (UUID)
Type ItemType `json:"type"` // Type of item (findings, assets, heartbeat)
Fingerprint string `json:"fingerprint"` // Content fingerprint for deduplication
Status ItemStatus `json:"status"` // Current status
// Payload
Report *ctis.Report `json:"report"` // The CTIS report to push
// Retry tracking
Attempts int `json:"attempts"` // Number of retry attempts made
MaxAttempts int `json:"max_attempts"` // Maximum retry attempts allowed
LastError string `json:"last_error"` // Last error message
LastAttempt time.Time `json:"last_attempt"` // Timestamp of last attempt
NextRetry time.Time `json:"next_retry"` // Scheduled next retry time
// Metadata
CreatedAt time.Time `json:"created_at"` // When item was first queued
UpdatedAt time.Time `json:"updated_at"` // Last update timestamp
AgentID string `json:"agent_id"` // Source agent ID
ScannerName string `json:"scanner_name"` // Source scanner name
TargetPath string `json:"target_path"` // Scan target path
}
QueueItem represents an item in the retry queue.
func (*QueueItem) HasExhaustedRetries ¶
HasExhaustedRetries checks if the item has used all retry attempts.
func (*QueueItem) IsReadyForRetry ¶
IsReadyForRetry checks if the item is ready to be retried.
type QueueStats ¶
type QueueStats struct {
TotalItems int `json:"total_items"` // Total items in queue
PendingItems int `json:"pending_items"` // Items waiting for retry
ProcessingItems int `json:"processing_items"` // Items currently being processed
FailedItems int `json:"failed_items"` // Items that exhausted retries
OldestItem time.Time `json:"oldest_item"` // Creation time of oldest item
NewestItem time.Time `json:"newest_item"` // Creation time of newest item
LastRetry time.Time `json:"last_retry"` // Last retry attempt time
TotalRetries int64 `json:"total_retries"` // Total retry attempts made
SuccessfulPush int64 `json:"successful_pushes"` // Successful pushes from retry
}
QueueStats provides statistics about the retry queue.
type ReportPusher ¶
type ReportPusher interface {
// PushReport pushes a CTIS report to the server.
PushReport(ctx context.Context, report *ctis.Report) error
}
ReportPusher is the interface for pushing reports to the server. This is typically implemented by the Client.
type RetryCallback ¶
type RetryCallback func(result *RetryResult)
RetryCallback is a function called after each retry attempt.
type RetryQueue ¶
type RetryQueue interface {
// Enqueue adds an item to the queue.
// Returns the item ID on success.
// Returns ErrQueueFull if the queue has reached its maximum capacity.
// Returns ErrDuplicateItem if an item with the same fingerprint exists.
Enqueue(ctx context.Context, item *QueueItem) (string, error)
// Dequeue removes and returns the next item ready for retry.
// Returns nil, nil if no items are ready for retry.
// The item's status should be set to ItemStatusProcessing.
Dequeue(ctx context.Context) (*QueueItem, error)
// Peek returns items ready for retry without removing them.
// Items are returned in order of priority (oldest NextRetry first).
Peek(ctx context.Context, limit int) ([]*QueueItem, error)
// Get retrieves an item by ID without removing it.
// Returns ErrItemNotFound if the item doesn't exist.
Get(ctx context.Context, id string) (*QueueItem, error)
// Update updates an existing item in the queue.
// Typically called after a retry attempt to update attempt count, error, etc.
// Returns ErrItemNotFound if the item doesn't exist.
Update(ctx context.Context, item *QueueItem) error
// Delete removes an item from the queue.
// Typically called after successful push or when item should be discarded.
// Returns ErrItemNotFound if the item doesn't exist.
Delete(ctx context.Context, id string) error
// MarkFailed marks an item as permanently failed.
// Called when an item has exhausted all retry attempts.
MarkFailed(ctx context.Context, id string, lastError string) error
// Requeue moves an item back to pending status for retry.
// Called when a processing item needs to be retried.
Requeue(ctx context.Context, id string, nextRetry time.Time) error
// Size returns the total number of items in the queue.
Size(ctx context.Context) (int, error)
// Stats returns detailed statistics about the queue.
Stats(ctx context.Context) (*QueueStats, error)
// Cleanup removes expired items (older than TTL) and permanently failed items.
// Returns the number of items removed.
Cleanup(ctx context.Context, ttl time.Duration) (int, error)
// List returns items matching the given filter.
List(ctx context.Context, filter ListFilter) ([]*QueueItem, error)
// Close closes the queue and releases any resources.
// After Close is called, all other methods should return ErrQueueClosed.
Close() error
}
RetryQueue defines the interface for retry queue implementations. Implementations must be safe for concurrent use.
type RetryResult ¶
type RetryResult struct {
ItemID string `json:"item_id"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Attempt int `json:"attempt"`
Timestamp time.Time `json:"timestamp"`
}
RetryResult represents the result of a retry operation.
type RetryWorker ¶
type RetryWorker struct {
// contains filtered or unexported fields
}
RetryWorker processes the retry queue in the background. It periodically checks the queue for items ready to retry and attempts to push them to the server.
func NewRetryWorker ¶
func NewRetryWorker(cfg *RetryWorkerConfig, queue RetryQueue, pusher ReportPusher) *RetryWorker
NewRetryWorker creates a new retry worker.
func (*RetryWorker) IsRunning ¶
func (w *RetryWorker) IsRunning() bool
IsRunning returns true if the worker is currently running.
func (*RetryWorker) OnExhaust ¶
func (w *RetryWorker) OnExhaust(fn func(item *QueueItem))
OnExhaust sets a callback to be called when an item exhausts all retries.
func (*RetryWorker) OnFail ¶
func (w *RetryWorker) OnFail(fn func(item *QueueItem, result *RetryResult))
OnFail sets a callback to be called after each failed retry.
func (*RetryWorker) OnSuccess ¶
func (w *RetryWorker) OnSuccess(fn func(item *QueueItem, result *RetryResult))
OnSuccess sets a callback to be called after each successful retry.
func (*RetryWorker) ProcessNow ¶
func (w *RetryWorker) ProcessNow(ctx context.Context) error
ProcessNow immediately processes the queue (for testing or manual triggers). This is a synchronous operation.
func (*RetryWorker) QueueStats ¶
func (w *RetryWorker) QueueStats(ctx context.Context) (*QueueStats, error)
QueueStats returns the current queue statistics.
func (*RetryWorker) Start ¶
func (w *RetryWorker) Start(ctx context.Context) error
Start starts the background retry worker. It returns immediately and processes the queue in a goroutine.
func (*RetryWorker) Stats ¶
func (w *RetryWorker) Stats() WorkerStats
Stats returns the current worker statistics.
func (*RetryWorker) Stop ¶
func (w *RetryWorker) Stop(ctx context.Context) error
Stop stops the background retry worker gracefully. It waits for the current batch to complete.
func (*RetryWorker) TriggerCleanup ¶
func (w *RetryWorker) TriggerCleanup(ctx context.Context) (int, error)
TriggerCleanup manually triggers a cleanup of expired items.
type RetryWorkerConfig ¶
type RetryWorkerConfig struct {
// Interval is how often to check the queue for items to retry.
// Default: 5 minutes
Interval time.Duration `yaml:"interval" json:"interval"`
// BatchSize is the maximum number of items to process per check.
// Default: 10
BatchSize int `yaml:"batch_size" json:"batch_size"`
// MaxAttempts is the maximum number of retry attempts per item.
// Default: 10
MaxAttempts int `yaml:"max_attempts" json:"max_attempts"`
// TTL is how long to keep items in the queue before expiring.
// Default: 7 days
TTL time.Duration `yaml:"ttl" json:"ttl"`
// Backoff configures the retry backoff behavior.
// Default: exponential backoff with 5-minute base
Backoff *BackoffConfig `yaml:"backoff" json:"backoff"`
// Verbose enables verbose logging.
Verbose bool `yaml:"verbose" json:"verbose"`
}
RetryWorkerConfig configures the retry worker.
func DefaultRetryWorkerConfig ¶
func DefaultRetryWorkerConfig() *RetryWorkerConfig
DefaultRetryWorkerConfig returns a configuration with default values.
type SmartPusher ¶
type SmartPusher interface {
ReportPusher
FingerprintChecker
}
SmartPusher combines ReportPusher with FingerprintChecker for intelligent retry. If the pusher also implements FingerprintChecker, the retry worker will check fingerprints before uploading to avoid duplicate uploads.
type WorkerStats ¶
type WorkerStats struct {
// Lifetime statistics
TotalAttempts int64 `json:"total_attempts"`
SuccessfulPush int64 `json:"successful_pushes"`
FailedAttempts int64 `json:"failed_attempts"`
ExhaustedItems int64 `json:"exhausted_items"`
TotalDuration time.Duration `json:"total_duration"`
LastProcessedAt time.Time `json:"last_processed_at"`
// Current state
IsRunning bool `json:"is_running"`
StartedAt time.Time `json:"started_at"`
LastCheckAt time.Time `json:"last_check_at"`
}
WorkerStats contains statistics about the retry worker.