dlq

package
v1.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Overview

Package dlq provides dead letter queue error classification and handling for failed message processing.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBatchProcessingFailed indicates that batch processing failed with some messages failing
	ErrBatchProcessingFailed = errors.BatchOperationFailed("DLQ processing", nil)

	// ErrNoDLQMessagesProcessed indicates that all DLQ messages failed to process
	ErrNoDLQMessagesProcessed = errors.ProcessingFailed("DLQ messages", stdErrors.New("no DLQ messages processed"))

	// Validation errors
	ErrMissingRequiredField         = errors.RequiredFieldMissing("field")
	ErrChannelsMustBeArray          = errors.NewValidationError("channels", "Channels must be an array")
	ErrMissingActivityPubType       = errors.RequiredFieldMissing("ActivityPub type")
	ErrActivityPubTypeMustBeString  = errors.NewValidationError("type", "ActivityPub type must be a string")
	ErrMissingActivityPubActor      = errors.RequiredFieldMissing("ActivityPub actor")
	ErrActivityPubActorMustBeString = errors.NewValidationError("actor", "ActivityPub actor must be a string")
	ErrInvalidAction                = errors.NewValidationError("action", "Invalid action")

	// URL validation errors
	ErrInvalidMediaURL       = errors.InvalidFormat("media_url", "valid URL format")
	ErrInvalidMediaURLFormat = errors.InvalidFormat("media_url", "valid URL format")
	ErrInvalidInboxURL       = errors.InvalidFormat("inbox_url", "valid URL format")
	ErrInvalidInboxURLFormat = errors.InvalidFormat("inbox_url", "valid URL format")

	// Media accessibility errors
	ErrMediaPermanentlyUnavailable = errors.ResourceUnavailable("media")
	ErrMediaAccessDenied           = errors.AccessDeniedForResource("media", "")
	ErrMediaValidationFailed       = errors.MediaAttachmentValidationFailed("non-retryable validation error")
)

Legacy error variables for backwards compatibility These are now wrappers around the centralized error system

Functions

func SendIndividualFailures

func SendIndividualFailures(ctx context.Context, service string, failures []ProcessingFailure, logger *zap.Logger)

SendIndividualFailures sends individual message failures to DLQ This is useful when processing messages individually within a batch

func WrapSQSHandler

func WrapSQSHandler(service string, handler func(context.Context, events.SQSEvent) error, logger *zap.Logger) func(context.Context, events.SQSEvent) error

WrapSQSHandler wraps an existing SQS handler to automatically send failures to DLQ

Types

type BatchReprocessResult

type BatchReprocessResult struct {
	TotalMessages         int      `json:"total_messages"`
	SuccessfulReprocesses int      `json:"successful_reprocesses"`
	FailedReprocesses     int      `json:"failed_reprocesses"`
	Errors                []string `json:"errors,omitempty"`
}

BatchReprocessResult represents the result of batch reprocessing

type DLQFailureMessage

type DLQFailureMessage struct {
	OriginalMessageID string            `json:"original_message_id"`
	Service           string            `json:"service"`
	QueueName         string            `json:"queue_name"`
	MessageBody       string            `json:"message_body"`
	MessageAttributes map[string]string `json:"message_attributes"`
	ErrorInfo         *ErrorInfo        `json:"error_info"`
	ProcessingContext ProcessingContext `json:"processing_context"`
	RetryCount        int               `json:"retry_count"`
	Timestamp         time.Time         `json:"timestamp"`
}

DLQFailureMessage represents a message being sent to DLQ

type DLQSender

type DLQSender struct {
	// contains filtered or unexported fields
}

DLQSender handles sending failed messages to dead letter queues

func NewDLQSender

func NewDLQSender(logger *zap.Logger) *DLQSender

NewDLQSender creates a new DLQ sender

func (*DLQSender) InitializeAWSClients

func (s *DLQSender) InitializeAWSClients(ctx context.Context) error

InitializeAWSClients initializes AWS clients

func (*DLQSender) SendBatchFailedMessages

func (s *DLQSender) SendBatchFailedMessages(ctx context.Context, service string, failures []ProcessingFailure) error

SendBatchFailedMessages sends multiple failed messages to DLQ

func (*DLQSender) SendFailedMessage

func (s *DLQSender) SendFailedMessage(ctx context.Context, service string, originalMessage events.SQSMessage, processingError error) error

SendFailedMessage sends a failed message to the appropriate DLQ

func (*DLQSender) SetSQSClient

func (s *DLQSender) SetSQSClient(client SQSClient)

SetSQSClient sets the SQS client (useful for testing)

type ErrorClassifier

type ErrorClassifier struct {
	// contains filtered or unexported fields
}

ErrorClassifier categorizes and analyzes errors from failed messages

func NewErrorClassifier

func NewErrorClassifier() *ErrorClassifier

NewErrorClassifier creates a new error classifier with predefined patterns

func (*ErrorClassifier) AddCustomPattern

func (ec *ErrorClassifier) AddCustomPattern(errorType string, patterns []string, isPermanent bool, priority, failureReason string)

AddCustomPattern adds a custom error pattern

func (*ErrorClassifier) AnalyzeErrorTrends

func (ec *ErrorClassifier) AnalyzeErrorTrends(messages []string) *ErrorTrendAnalysis

AnalyzeErrorTrends analyzes error trends from a collection of messages

func (*ErrorClassifier) ClassifyError

func (ec *ErrorClassifier) ClassifyError(messageBody, service string) *ErrorInfo

ClassifyError analyzes an error and returns classification information

func (*ErrorClassifier) CreateAppError

func (ec *ErrorClassifier) CreateAppError(messageBody, service string) *errors.AppError

CreateAppError creates an AppError from classified DLQ error information

func (*ErrorClassifier) GetPatterns

func (ec *ErrorClassifier) GetPatterns() map[string]*ErrorPattern

GetPatterns returns all registered patterns

type ErrorInfo

type ErrorInfo struct {
	ErrorType     string `json:"error_type"`
	ErrorMessage  string `json:"error_message"`
	StackTrace    string `json:"stack_trace,omitempty"`
	FailureReason string `json:"failure_reason"`
	IsPermanent   bool   `json:"is_permanent"`
	Priority      string `json:"priority"`
	Category      string `json:"category"`
}

ErrorInfo contains classified error information

type ErrorPattern

type ErrorPattern struct {
	ErrorType     string   `json:"error_type"`
	Patterns      []string `json:"patterns"`
	IsPermanent   bool     `json:"is_permanent"`
	Priority      string   `json:"priority"`
	FailureReason string   `json:"failure_reason"`
}

ErrorPattern represents a pattern for classifying errors

type ErrorTrendAnalysis

type ErrorTrendAnalysis struct {
	TotalMessages      int            `json:"total_messages"`
	ErrorTypeCounts    map[string]int `json:"error_type_counts"`
	PermanentErrors    int            `json:"permanent_errors"`
	TransientErrors    int            `json:"transient_errors"`
	PermanentErrorRate float64        `json:"permanent_error_rate"`
	TransientErrorRate float64        `json:"transient_error_rate"`
	PriorityBreakdown  map[string]int `json:"priority_breakdown"`
}

ErrorTrendAnalysis represents analysis of error patterns

type OriginalMessage

type OriginalMessage struct {
	MessageID         string            `json:"message_id"`
	OriginalMessageID string            `json:"original_message_id"`
	Body              string            `json:"body"`
	Attributes        map[string]string `json:"attributes"`
	SourceQueue       string            `json:"source_queue"`
	ReceiptHandle     string            `json:"receipt_handle"`
}

OriginalMessage represents the original failed message

type ProcessingContext

type ProcessingContext struct {
	FunctionName string    `json:"function_name"`
	LogGroup     string    `json:"log_group"`
	LogStream    string    `json:"log_stream"`
	RequestID    string    `json:"request_id,omitempty"`
	Timestamp    time.Time `json:"timestamp"`
}

ProcessingContext contains context about the failed processing

type ProcessingFailure

type ProcessingFailure struct {
	OriginalMessage events.SQSMessage `json:"original_message"`
	Error           error             `json:"error"`
	Timestamp       time.Time         `json:"timestamp"`
}

ProcessingFailure represents a failed message processing attempt

type ProcessingResult

type ProcessingResult struct {
	MessageID         string    `json:"message_id"`
	Success           bool      `json:"success"`
	Error             string    `json:"error,omitempty"`
	ReprocessingCount int       `json:"reprocessing_count"`
	ProcessingTimeMs  int64     `json:"processing_time_ms"`
	CostMicroCents    int64     `json:"cost_micro_cents"`
	Timestamp         time.Time `json:"timestamp"`
}

ProcessingResult represents the result of processing a DLQ message

type ProcessingStats

type ProcessingStats struct {
	TotalMessages       int     `json:"total_messages"`
	ProcessedMessages   int     `json:"processed_messages"`
	FailedMessages      int     `json:"failed_messages"`
	ReprocessedMessages int     `json:"reprocessed_messages"`
	ResolvedMessages    int     `json:"resolved_messages"`
	AbandonedMessages   int     `json:"abandoned_messages"`
	TotalCostMicroCents int64   `json:"total_cost_micro_cents"`
	TotalCostDollars    float64 `json:"total_cost_dollars"`
	ProcessingTimeMs    int64   `json:"processing_time_ms"`
}

ProcessingStats represents statistics for a processing batch

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles dead letter queue message processing

func NewProcessor

func NewProcessor(db core.DB, tableName string, logger *zap.Logger) *Processor

NewProcessor creates a new DLQ processor

func (*Processor) CleanupExpiredMessages

func (p *Processor) CleanupExpiredMessages(ctx context.Context) error

CleanupExpiredMessages removes expired DLQ messages

func (*Processor) GetAnalytics

func (p *Processor) GetAnalytics(ctx context.Context, service string, timeRange repositories.DLQTimeRange) (*repositories.DLQAnalytics, error)

GetAnalytics returns DLQ analytics for monitoring

func (*Processor) GetTrends

func (p *Processor) GetTrends(ctx context.Context, service string, days int) (*repositories.DLQTrends, error)

GetTrends returns DLQ trends for monitoring

func (*Processor) InitializeAWSClients

func (p *Processor) InitializeAWSClients(ctx context.Context) error

InitializeAWSClients initializes AWS clients

func (*Processor) ProcessDLQMessages

func (p *Processor) ProcessDLQMessages(ctx context.Context, event events.SQSEvent) error

ProcessDLQMessages processes messages from dead letter queues

func (*Processor) ScheduledReprocessing

func (p *Processor) ScheduledReprocessing(ctx context.Context) error

ScheduledReprocessing handles scheduled reprocessing of failed messages

func (*Processor) SearchMessages

func (p *Processor) SearchMessages(ctx context.Context, filter *repositories.DLQSearchFilter) ([]*models.DLQMessage, string, error)

SearchMessages searches DLQ messages with filters

type ReprocessConfig

type ReprocessConfig struct {
	ValidateMessage    func(map[string]interface{}) error
	CheckAccessibility func(context.Context, map[string]interface{}) error
	ReprocessType      string
}

ReprocessConfig defines configuration for message reprocessing

type ReprocessingStrategy

type ReprocessingStrategy struct {
	MaxRetries         int    `json:"max_retries"`
	DelaySeconds       int32  `json:"delay_seconds"`
	BackoffStrategy    string `json:"backoff_strategy"` // "linear", "exponential", "fixed"
	ValidateFirst      bool   `json:"validate_first"`
	CheckAccessibility bool   `json:"check_accessibility"`
}

ReprocessingStrategy represents different strategies for reprocessing

func GetDefaultStrategy

func GetDefaultStrategy(service string) *ReprocessingStrategy

GetDefaultStrategy returns the default reprocessing strategy for a service

type ReprocessorClient

type ReprocessorClient struct {
	// contains filtered or unexported fields
}

ReprocessorClient handles reprocessing of failed messages

func NewReprocessorClient

func NewReprocessorClient(logger *zap.Logger) *ReprocessorClient

NewReprocessorClient creates a new reprocessor client

func (*ReprocessorClient) BatchReprocess

func (r *ReprocessorClient) BatchReprocess(ctx context.Context, messages []*OriginalMessage, service string) (*BatchReprocessResult, error)

BatchReprocess reprocesses multiple messages in batch

func (*ReprocessorClient) ReprocessActivity

func (r *ReprocessorClient) ReprocessActivity(ctx context.Context, originalMessage *OriginalMessage) error

ReprocessActivity reprocesses a failed activity message

func (*ReprocessorClient) ReprocessFederation

func (r *ReprocessorClient) ReprocessFederation(ctx context.Context, originalMessage *OriginalMessage) error

ReprocessFederation reprocesses a failed federation delivery message

func (*ReprocessorClient) ReprocessGeneric

func (r *ReprocessorClient) ReprocessGeneric(ctx context.Context, sourceQueue string, originalMessage *OriginalMessage) error

ReprocessGeneric reprocesses a message for an unknown service

func (*ReprocessorClient) ReprocessMedia

func (r *ReprocessorClient) ReprocessMedia(ctx context.Context, originalMessage *OriginalMessage) error

ReprocessMedia reprocesses a failed media processing message

func (*ReprocessorClient) ReprocessNotification

func (r *ReprocessorClient) ReprocessNotification(ctx context.Context, originalMessage *OriginalMessage) error

ReprocessNotification reprocesses a failed notification message

func (*ReprocessorClient) ReprocessSearch

func (r *ReprocessorClient) ReprocessSearch(ctx context.Context, originalMessage *OriginalMessage) error

ReprocessSearch reprocesses a failed search indexing message

func (*ReprocessorClient) SetSQSClient

func (r *ReprocessorClient) SetSQSClient(client SQSClient)

SetSQSClient sets the SQS client

type SQSClient

type SQSClient interface {
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
}

SQSClient defines the interface for SQS operations used by the DLQ package. This interface allows for easy mocking in unit tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL