Documentation
¶
Index ¶
- type SQSClient
- type SQSQueuedJob
- type SQSStore
- func (s *SQSStore) Ack(queueName string, jobID string) error
- func (s *SQSStore) DequeueMetrics(queueName string) (jobConfig.JobMetrics, error)
- func (s *SQSStore) EnqueueMetrics(metrics jobConfig.JobMetrics) error
- func (s *SQSStore) GetDbConnection() interface{}
- func (s *SQSStore) IsHealthy() bool
- func (s *SQSStore) Pop(queueName string) (job.JobContext, error)
- func (s *SQSStore) Push(queueName string, jb job.Job, delay ...time.Duration) error
- func (s *SQSStore) PushBatch(queueName string, jobs []job.Job, delay ...time.Duration) error
- func (s *SQSStore) Retry(job job.Job, delay time.Duration) error
- func (s *SQSStore) RetryJobWithMetadata(queueName string, queuedJob job.JobContext, delay ...time.Duration) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type SQSClient ¶
type SQSClient interface {
SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)
ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}
SQSClient defines the interface for SQS client operations we use This allows us to easily mock the SQS client for testing
type SQSQueuedJob ¶
type SQSQueuedJob struct {
// Job contains the serialized job data
Job json.RawMessage `json:"job"`
// ID is a unique identifier for this queued job
ID string `json:"id"`
// JobName is the registered name of the job type, used for deserialization
JobName string `json:"job_name"`
// EnqueuedAt is the timestamp when the job was added to the queue
EnqueuedAt time.Time `json:"enqueued_at"`
// RetryCount tracks how many times this job has been retried
RetryCount int `json:"retry_count"`
// ReceiptHandle is the SQS receipt handle for acknowledgment
ReceiptHandle string `json:"receipt_handle,omitempty"`
}
SQSQueuedJob represents a job queued in SQS
type SQSStore ¶
type SQSStore struct {
// contains filtered or unexported fields
}
SQSStore is an implementation of the Store interface using AWS SQS
func NewSQSStore ¶
NewSQSStore creates a new SQS-backed store implementation
func NewSQSStoreWithClient ¶
func NewSQSStoreWithClient(client SQSClient, sqsCfg jobConfig.SQSConfig, cfg jobConfig.Config, logger logger.Logger) *SQSStore
NewSQSStoreWithClient creates a new SQS store with a custom SQS client This is primarily used for testing with mock clients
func (*SQSStore) DequeueMetrics ¶
func (s *SQSStore) DequeueMetrics(queueName string) (jobConfig.JobMetrics, error)
DequeueMetrics retrieves job metrics from a metrics queue
func (*SQSStore) EnqueueMetrics ¶
func (s *SQSStore) EnqueueMetrics(metrics jobConfig.JobMetrics) error
EnqueueMetrics adds job metrics to a metrics queue
func (*SQSStore) GetDbConnection ¶
func (s *SQSStore) GetDbConnection() interface{}
func (*SQSStore) IsHealthy ¶
IsHealthy returns the last known health status of the SQS connection. It does not perform a potentially blocking network call; operational methods (e.g., Push/Pop) update s.healthStatus when errors occur.
func (*SQSStore) Pop ¶
func (s *SQSStore) Pop(queueName string) (job.JobContext, error)
Pop retrieves a job from the queue
func (*SQSStore) PushBatch ¶
PushBatch adds multiple jobs to the queue in a single call, with optional delay
func (*SQSStore) RetryJobWithMetadata ¶
func (s *SQSStore) RetryJobWithMetadata(queueName string, queuedJob job.JobContext, delay ...time.Duration) error
RetryJobWithMetadata handles job retries using SQS ChangeMessageVisibility This provides the same retry behavior as the Redis driver using SQS's native visibility timeout