Documentation
¶
Overview ¶
Package dlq provides dead letter queue error classification and handling for failed message processing.
Index ¶
- Variables
- func SendIndividualFailures(ctx context.Context, service string, failures []ProcessingFailure, ...)
- func WrapSQSHandler(service string, handler func(context.Context, events.SQSEvent) error, ...) func(context.Context, events.SQSEvent) error
- type BatchReprocessResult
- type DLQFailureMessage
- type DLQSender
- func (s *DLQSender) InitializeAWSClients(ctx context.Context) error
- func (s *DLQSender) SendBatchFailedMessages(ctx context.Context, service string, failures []ProcessingFailure) error
- func (s *DLQSender) SendFailedMessage(ctx context.Context, service string, originalMessage events.SQSMessage, ...) error
- func (s *DLQSender) SetSQSClient(client SQSClient)
- type ErrorClassifier
- func (ec *ErrorClassifier) AddCustomPattern(errorType string, patterns []string, isPermanent bool, ...)
- func (ec *ErrorClassifier) AnalyzeErrorTrends(messages []string) *ErrorTrendAnalysis
- func (ec *ErrorClassifier) ClassifyError(messageBody, service string) *ErrorInfo
- func (ec *ErrorClassifier) CreateAppError(messageBody, service string) *errors.AppError
- func (ec *ErrorClassifier) GetPatterns() map[string]*ErrorPattern
- type ErrorInfo
- type ErrorPattern
- type ErrorTrendAnalysis
- type OriginalMessage
- type ProcessingContext
- type ProcessingFailure
- type ProcessingResult
- type ProcessingStats
- type Processor
- func (p *Processor) CleanupExpiredMessages(ctx context.Context) error
- func (p *Processor) GetAnalytics(ctx context.Context, service string, timeRange repositories.DLQTimeRange) (*repositories.DLQAnalytics, error)
- func (p *Processor) GetTrends(ctx context.Context, service string, days int) (*repositories.DLQTrends, error)
- func (p *Processor) InitializeAWSClients(ctx context.Context) error
- func (p *Processor) ProcessDLQMessages(ctx context.Context, event events.SQSEvent) error
- func (p *Processor) ScheduledReprocessing(ctx context.Context) error
- func (p *Processor) SearchMessages(ctx context.Context, filter *repositories.DLQSearchFilter) ([]*models.DLQMessage, string, error)
- type ReprocessConfig
- type ReprocessingStrategy
- type ReprocessorClient
- func (r *ReprocessorClient) BatchReprocess(ctx context.Context, messages []*OriginalMessage, service string) (*BatchReprocessResult, error)
- func (r *ReprocessorClient) ReprocessActivity(ctx context.Context, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) ReprocessFederation(ctx context.Context, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) ReprocessGeneric(ctx context.Context, sourceQueue string, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) ReprocessMedia(ctx context.Context, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) ReprocessNotification(ctx context.Context, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) ReprocessSearch(ctx context.Context, originalMessage *OriginalMessage) error
- func (r *ReprocessorClient) SetSQSClient(client SQSClient)
- type SQSClient
Constants ¶
This section is empty.
Variables ¶
var ( // ErrBatchProcessingFailed indicates that batch processing failed with some messages failing ErrBatchProcessingFailed = errors.BatchOperationFailed("DLQ processing", nil) // ErrNoDLQMessagesProcessed indicates that all DLQ messages failed to process ErrNoDLQMessagesProcessed = errors.ProcessingFailed("DLQ messages", stdErrors.New("no DLQ messages processed")) // Validation errors ErrMissingRequiredField = errors.RequiredFieldMissing("field") ErrChannelsMustBeArray = errors.NewValidationError("channels", "Channels must be an array") ErrMissingActivityPubType = errors.RequiredFieldMissing("ActivityPub type") ErrActivityPubTypeMustBeString = errors.NewValidationError("type", "ActivityPub type must be a string") ErrMissingActivityPubActor = errors.RequiredFieldMissing("ActivityPub actor") ErrActivityPubActorMustBeString = errors.NewValidationError("actor", "ActivityPub actor must be a string") ErrInvalidAction = errors.NewValidationError("action", "Invalid action") // URL validation errors ErrInvalidMediaURL = errors.InvalidFormat("media_url", "valid URL format") ErrInvalidMediaURLFormat = errors.InvalidFormat("media_url", "valid URL format") ErrInvalidInboxURL = errors.InvalidFormat("inbox_url", "valid URL format") ErrInvalidInboxURLFormat = errors.InvalidFormat("inbox_url", "valid URL format") // Media accessibility errors ErrMediaAccessDenied = errors.AccessDeniedForResource("media", "") ErrMediaValidationFailed = errors.MediaAttachmentValidationFailed("non-retryable validation error") )
Legacy error variables for backwards compatibility These are now wrappers around the centralized error system
Functions ¶
func SendIndividualFailures ¶
func SendIndividualFailures(ctx context.Context, service string, failures []ProcessingFailure, logger *zap.Logger)
SendIndividualFailures sends individual message failures to DLQ This is useful when processing messages individually within a batch
Types ¶
type BatchReprocessResult ¶
type BatchReprocessResult struct {
TotalMessages int `json:"total_messages"`
SuccessfulReprocesses int `json:"successful_reprocesses"`
FailedReprocesses int `json:"failed_reprocesses"`
Errors []string `json:"errors,omitempty"`
}
BatchReprocessResult represents the result of batch reprocessing
type DLQFailureMessage ¶
type DLQFailureMessage struct {
OriginalMessageID string `json:"original_message_id"`
Service string `json:"service"`
QueueName string `json:"queue_name"`
MessageBody string `json:"message_body"`
MessageAttributes map[string]string `json:"message_attributes"`
ErrorInfo *ErrorInfo `json:"error_info"`
ProcessingContext ProcessingContext `json:"processing_context"`
RetryCount int `json:"retry_count"`
Timestamp time.Time `json:"timestamp"`
}
DLQFailureMessage represents a message being sent to DLQ
type DLQSender ¶
type DLQSender struct {
// contains filtered or unexported fields
}
DLQSender handles sending failed messages to dead letter queues
func NewDLQSender ¶
NewDLQSender creates a new DLQ sender
func (*DLQSender) InitializeAWSClients ¶
InitializeAWSClients initializes AWS clients
func (*DLQSender) SendBatchFailedMessages ¶
func (s *DLQSender) SendBatchFailedMessages(ctx context.Context, service string, failures []ProcessingFailure) error
SendBatchFailedMessages sends multiple failed messages to DLQ
func (*DLQSender) SendFailedMessage ¶
func (s *DLQSender) SendFailedMessage(ctx context.Context, service string, originalMessage events.SQSMessage, processingError error) error
SendFailedMessage sends a failed message to the appropriate DLQ
func (*DLQSender) SetSQSClient ¶
SetSQSClient sets the SQS client (useful for testing)
type ErrorClassifier ¶
type ErrorClassifier struct {
// contains filtered or unexported fields
}
ErrorClassifier categorizes and analyzes errors from failed messages
func NewErrorClassifier ¶
func NewErrorClassifier() *ErrorClassifier
NewErrorClassifier creates a new error classifier with predefined patterns
func (*ErrorClassifier) AddCustomPattern ¶
func (ec *ErrorClassifier) AddCustomPattern(errorType string, patterns []string, isPermanent bool, priority, failureReason string)
AddCustomPattern adds a custom error pattern
func (*ErrorClassifier) AnalyzeErrorTrends ¶
func (ec *ErrorClassifier) AnalyzeErrorTrends(messages []string) *ErrorTrendAnalysis
AnalyzeErrorTrends analyzes error trends from a collection of messages
func (*ErrorClassifier) ClassifyError ¶
func (ec *ErrorClassifier) ClassifyError(messageBody, service string) *ErrorInfo
ClassifyError analyzes an error and returns classification information
func (*ErrorClassifier) CreateAppError ¶
func (ec *ErrorClassifier) CreateAppError(messageBody, service string) *errors.AppError
CreateAppError creates an AppError from classified DLQ error information
func (*ErrorClassifier) GetPatterns ¶
func (ec *ErrorClassifier) GetPatterns() map[string]*ErrorPattern
GetPatterns returns all registered patterns
type ErrorInfo ¶
type ErrorInfo struct {
ErrorType string `json:"error_type"`
ErrorMessage string `json:"error_message"`
StackTrace string `json:"stack_trace,omitempty"`
FailureReason string `json:"failure_reason"`
IsPermanent bool `json:"is_permanent"`
Priority string `json:"priority"`
Category string `json:"category"`
}
ErrorInfo contains classified error information
type ErrorPattern ¶
type ErrorPattern struct {
ErrorType string `json:"error_type"`
Patterns []string `json:"patterns"`
IsPermanent bool `json:"is_permanent"`
Priority string `json:"priority"`
FailureReason string `json:"failure_reason"`
}
ErrorPattern represents a pattern for classifying errors
type ErrorTrendAnalysis ¶
type ErrorTrendAnalysis struct {
TotalMessages int `json:"total_messages"`
ErrorTypeCounts map[string]int `json:"error_type_counts"`
PermanentErrors int `json:"permanent_errors"`
TransientErrors int `json:"transient_errors"`
PermanentErrorRate float64 `json:"permanent_error_rate"`
TransientErrorRate float64 `json:"transient_error_rate"`
PriorityBreakdown map[string]int `json:"priority_breakdown"`
}
ErrorTrendAnalysis represents analysis of error patterns
type OriginalMessage ¶
type OriginalMessage struct {
MessageID string `json:"message_id"`
OriginalMessageID string `json:"original_message_id"`
Body string `json:"body"`
Attributes map[string]string `json:"attributes"`
SourceQueue string `json:"source_queue"`
ReceiptHandle string `json:"receipt_handle"`
}
OriginalMessage represents the original failed message
type ProcessingContext ¶
type ProcessingContext struct {
FunctionName string `json:"function_name"`
LogGroup string `json:"log_group"`
LogStream string `json:"log_stream"`
RequestID string `json:"request_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ProcessingContext contains context about the failed processing
type ProcessingFailure ¶
type ProcessingFailure struct {
OriginalMessage events.SQSMessage `json:"original_message"`
Error error `json:"error"`
Timestamp time.Time `json:"timestamp"`
}
ProcessingFailure represents a failed message processing attempt
type ProcessingResult ¶
type ProcessingResult struct {
MessageID string `json:"message_id"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
ReprocessingCount int `json:"reprocessing_count"`
ProcessingTimeMs int64 `json:"processing_time_ms"`
CostMicroCents int64 `json:"cost_micro_cents"`
Timestamp time.Time `json:"timestamp"`
}
ProcessingResult represents the result of processing a DLQ message
type ProcessingStats ¶
type ProcessingStats struct {
TotalMessages int `json:"total_messages"`
ProcessedMessages int `json:"processed_messages"`
FailedMessages int `json:"failed_messages"`
ReprocessedMessages int `json:"reprocessed_messages"`
ResolvedMessages int `json:"resolved_messages"`
AbandonedMessages int `json:"abandoned_messages"`
TotalCostMicroCents int64 `json:"total_cost_micro_cents"`
TotalCostDollars float64 `json:"total_cost_dollars"`
ProcessingTimeMs int64 `json:"processing_time_ms"`
}
ProcessingStats represents statistics for a processing batch
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles dead letter queue message processing
func NewProcessor ¶
NewProcessor creates a new DLQ processor
func (*Processor) CleanupExpiredMessages ¶
CleanupExpiredMessages removes expired DLQ messages
func (*Processor) GetAnalytics ¶
func (p *Processor) GetAnalytics(ctx context.Context, service string, timeRange repositories.DLQTimeRange) (*repositories.DLQAnalytics, error)
GetAnalytics returns DLQ analytics for monitoring
func (*Processor) GetTrends ¶
func (p *Processor) GetTrends(ctx context.Context, service string, days int) (*repositories.DLQTrends, error)
GetTrends returns DLQ trends for monitoring
func (*Processor) InitializeAWSClients ¶
InitializeAWSClients initializes AWS clients
func (*Processor) ProcessDLQMessages ¶
ProcessDLQMessages processes messages from dead letter queues
func (*Processor) ScheduledReprocessing ¶
ScheduledReprocessing handles scheduled reprocessing of failed messages
func (*Processor) SearchMessages ¶
func (p *Processor) SearchMessages(ctx context.Context, filter *repositories.DLQSearchFilter) ([]*models.DLQMessage, string, error)
SearchMessages searches DLQ messages with filters
type ReprocessConfig ¶
type ReprocessConfig struct {
ValidateMessage func(map[string]interface{}) error
CheckAccessibility func(context.Context, map[string]interface{}) error
ReprocessType string
}
ReprocessConfig defines configuration for message reprocessing
type ReprocessingStrategy ¶
type ReprocessingStrategy struct {
MaxRetries int `json:"max_retries"`
DelaySeconds int32 `json:"delay_seconds"`
BackoffStrategy string `json:"backoff_strategy"` // "linear", "exponential", "fixed"
ValidateFirst bool `json:"validate_first"`
CheckAccessibility bool `json:"check_accessibility"`
}
ReprocessingStrategy represents different strategies for reprocessing
func GetDefaultStrategy ¶
func GetDefaultStrategy(service string) *ReprocessingStrategy
GetDefaultStrategy returns the default reprocessing strategy for a service
type ReprocessorClient ¶
type ReprocessorClient struct {
// contains filtered or unexported fields
}
ReprocessorClient handles reprocessing of failed messages
func NewReprocessorClient ¶
func NewReprocessorClient(logger *zap.Logger) *ReprocessorClient
NewReprocessorClient creates a new reprocessor client
func (*ReprocessorClient) BatchReprocess ¶
func (r *ReprocessorClient) BatchReprocess(ctx context.Context, messages []*OriginalMessage, service string) (*BatchReprocessResult, error)
BatchReprocess reprocesses multiple messages in batch
func (*ReprocessorClient) ReprocessActivity ¶
func (r *ReprocessorClient) ReprocessActivity(ctx context.Context, originalMessage *OriginalMessage) error
ReprocessActivity reprocesses a failed activity message
func (*ReprocessorClient) ReprocessFederation ¶
func (r *ReprocessorClient) ReprocessFederation(ctx context.Context, originalMessage *OriginalMessage) error
ReprocessFederation reprocesses a failed federation delivery message
func (*ReprocessorClient) ReprocessGeneric ¶
func (r *ReprocessorClient) ReprocessGeneric(ctx context.Context, sourceQueue string, originalMessage *OriginalMessage) error
ReprocessGeneric reprocesses a message for an unknown service
func (*ReprocessorClient) ReprocessMedia ¶
func (r *ReprocessorClient) ReprocessMedia(ctx context.Context, originalMessage *OriginalMessage) error
ReprocessMedia reprocesses a failed media processing message
func (*ReprocessorClient) ReprocessNotification ¶
func (r *ReprocessorClient) ReprocessNotification(ctx context.Context, originalMessage *OriginalMessage) error
ReprocessNotification reprocesses a failed notification message
func (*ReprocessorClient) ReprocessSearch ¶
func (r *ReprocessorClient) ReprocessSearch(ctx context.Context, originalMessage *OriginalMessage) error
ReprocessSearch reprocesses a failed search indexing message
func (*ReprocessorClient) SetSQSClient ¶
func (r *ReprocessorClient) SetSQSClient(client SQSClient)
SetSQSClient sets the SQS client
type SQSClient ¶
type SQSClient interface {
GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
}
SQSClient defines the interface for SQS operations used by the DLQ package. This interface allows for easy mocking in unit tests.