Documentation
¶
Index ¶
- Constants
- Variables
- func RandomInt(minVal, maxVal int) int
- func RandomPostfix(name string) string
- func RandomString(n int) string
- func ToBatches[T any](items []T, batchSize int) [][]T
- func ToReceiptHandles(msgs []events.SQSMessage) []string
- type FifoQueue
- type FifoQueueImpl
- func (bq FifoQueueImpl) Client() *dynamodb.Client
- func (bq FifoQueueImpl) ClientID() string
- func (fq *FifoQueueImpl) Count(ctx context.Context) (int32, error)
- func (fq *FifoQueueImpl) CreateQueueTable(ctx context.Context) (bool, error)
- func (bq FifoQueueImpl) DefaultTTL() time.Duration
- func (fq *FifoQueueImpl) DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
- func (fq *FifoQueueImpl) DropQueueTable(ctx context.Context) error
- func (fq *FifoQueueImpl) List(ctx context.Context) ([]QueueAndClientID, error)
- func (bq FifoQueueImpl) Logging() bool
- func (bq FifoQueueImpl) PartitionKey() string
- func (fq *FifoQueueImpl) PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, ...) ([]events.SQSMessage, error)
- func (fq *FifoQueueImpl) Purge(ctx context.Context) error
- func (fq *FifoQueueImpl) PurgeAll(ctx context.Context) error
- func (fq *FifoQueueImpl) PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
- func (fq *FifoQueueImpl) PushMessagesWithGroup(ctx context.Context, ttl time.Duration, messageGroup string, ...) ([]events.SQSMessage, error)
- func (bq FifoQueueImpl) QueueName() string
- func (bq FifoQueueImpl) RandomDigits() int
- func (bq FifoQueueImpl) SetLogging(enabled bool)
- func (bq FifoQueueImpl) Table() string
- func (fq *FifoQueueImpl) TableExists(ctx context.Context) bool
- func (bq FifoQueueImpl) Type() QueueType
- func (fq *FifoQueueImpl) UseClientID(clientID string) Queue
- func (fq *FifoQueueImpl) UseQueueName(queueName string) Queue
- func (fq *FifoQueueImpl) UseSigningKey(key []byte) Queue
- func (fq *FifoQueueImpl) UseTable(table string) Queue
- type Queue
- type QueueAndClientID
- type QueueType
- type StandardQueue
- func (bq StandardQueue) Client() *dynamodb.Client
- func (bq StandardQueue) ClientID() string
- func (sq *StandardQueue) Count(ctx context.Context) (int32, error)
- func (sq *StandardQueue) CreateQueueTable(ctx context.Context) (bool, error)
- func (bq StandardQueue) DefaultTTL() time.Duration
- func (sq *StandardQueue) DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
- func (sq *StandardQueue) DropQueueTable(ctx context.Context) error
- func (sq *StandardQueue) List(ctx context.Context) ([]QueueAndClientID, error)
- func (bq StandardQueue) Logging() bool
- func (bq StandardQueue) PartitionKey() string
- func (sq *StandardQueue) PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, ...) ([]events.SQSMessage, error)
- func (sq *StandardQueue) Purge(ctx context.Context) error
- func (sq *StandardQueue) PurgeAll(ctx context.Context) error
- func (sq *StandardQueue) PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
- func (bq StandardQueue) QueueName() string
- func (bq StandardQueue) RandomDigits() int
- func (bq StandardQueue) SetLogging(enabled bool)
- func (bq StandardQueue) Table() string
- func (sq *StandardQueue) TableExists(ctx context.Context) bool
- func (bq StandardQueue) Type() QueueType
- func (sq *StandardQueue) UseClientID(clientID string) Queue
- func (sq *StandardQueue) UseQueueName(queueName string) Queue
- func (sq *StandardQueue) UseSigningKey(key []byte) Queue
- func (sq *StandardQueue) UseTable(table string) Queue
Constants ¶
const ( // ColumnPK is the name of the partition key. ColumnPK = "PK" // ColumnSK is the name of the sort key. ColumnSK = "SK" // ColumnHiddenUntil is the name of the hidden_until attribute where the visibility timeout // while polling messages is stored and compared against. ColumnHiddenUntil = "hidden_until" // ColumnOwner is the name of the owner attribute where the clientID is stored. ColumnOwner = "owner" // ColumnTTL is the name of the TTL attribute where the time to live is stored. // This is used when the message is pushed to the queue and DynamoDB is configured to // automatically delete the message when the TTL is reached. ColumnTTL = "TTL" // ColumnEvent is the name of the event attribute where the `events.SQSMessage` is stored. ColumnEvent = "event" // ColumnMessageGroup is the name of the message_group attribute for FIFO queues. ColumnMessageGroup = "message_group" )
DynamoDB column names.
const (
AttrSentTimestamp = "SentTimestamp"
)
SQS message attribute names.
const DefaultMessageGroup = "default"
DefaultMessageGroup is used when no message group is specified for FIFO queues.
const MaxMessageBodySize = 256 * 1024
MaxMessageBodySize is the maximum allowed size for a message body (256KB). This is conservative compared to DynamoDB's 400KB item limit to leave room for message attributes and metadata.
const MinRandomDigits = 8
MinRandomDigits is the minimum number of random digits required for SK uniqueness. With 8 digits using 36 characters (a-z, 0-9), we have 36^8 = 2.8 trillion combinations, providing extremely low collision probability even at high write rates.
const MinSigningKeyLength = 32
MinSigningKeyLength is the minimum number of bytes required for the HMAC-SHA256 signing key. Keys shorter than the hash output (32 bytes / 256 bits) reduce HMAC security per RFC 2104.
const PartitionKeySeparator = "|"
Separator constants.
const RecipientHandleSeparator = "&"
Variables ¶
var ( ErrTableNameNotSet = errors.New("table name not set") ErrQueueNameNotSet = errors.New("queue name not set") ErrClientIDNotSet = errors.New("client ID not set") ErrInvalidQueueName = errors.New("invalid queue name: must be 1-64 chars without | or & separators") ErrInvalidClientID = errors.New("invalid client ID: must be 1-64 chars without | or & separators") ErrExpressionBuild = errors.New("failed to build DynamoDB expression") )
Sentinel errors for validation failures.
var ( ErrTooManyMessages = errors.New("maximum of 25 messages allowed") ErrMessageTooLarge = errors.New("message body exceeds maximum size") ErrAttributeNotFound = errors.New("attribute not found") ErrAttributeNotNumber = errors.New("attribute is not a number") ErrAttributeNotString = errors.New("attribute is not a string") ErrBatchWriteRetries = errors.New("batch write failed after max retries") ErrSigningKeyTooShort = errors.New("signing key too short: minimum 32 bytes required for HMAC-SHA256") )
Sentinel errors for message operations.
Functions ¶
func RandomInt ¶
RandomInt generates a cryptographically secure random integer in the range [minVal, maxVal). If random number generation fails (extremely rare), returns minVal as a safe fallback rather than panicking. Library code should never panic in normal operation.
func RandomPostfix ¶
RandomPostfix postfixes the name with a random string.
func RandomString ¶
RandomString generates a random alphanumeric string of length n.
func ToReceiptHandles ¶
func ToReceiptHandles(msgs []events.SQSMessage) []string
ToReceiptHandles extracts the receipt handles from the SQS messages.
Types ¶
type FifoQueue ¶ added in v0.10.0
type FifoQueue interface {
Queue
// PushMessagesWithGroup pushes messages to a specific message group.
// Messages in the same group are delivered in strict FIFO order.
// Only one message per group can be in-flight at a time.
PushMessagesWithGroup(ctx context.Context, ttl time.Duration, messageGroup string, messages ...events.SQSMessage) ([]events.SQSMessage, error)
}
FifoQueue extends Queue with message group support for FIFO queues.
type FifoQueueImpl ¶ added in v0.10.0
type FifoQueueImpl struct {
// contains filtered or unexported fields
}
FifoQueueImpl implements the FifoQueue interface with strict FIFO ordering per message group and exactly one message in-flight per group.
func NewFifoQueue ¶ added in v0.10.0
func NewFifoQueue(client *dynamodb.Client, ttl time.Duration) *FifoQueueImpl
NewFifoQueue creates a new FifoQueueImpl instance.
func (FifoQueueImpl) ClientID ¶ added in v0.10.0
func (bq FifoQueueImpl) ClientID() string
ClientID returns the client ID.
func (*FifoQueueImpl) Count ¶ added in v0.10.0
func (fq *FifoQueueImpl) Count(ctx context.Context) (int32, error)
Count returns the number of messages in the queue.
func (*FifoQueueImpl) CreateQueueTable ¶ added in v0.10.0
func (fq *FifoQueueImpl) CreateQueueTable(ctx context.Context) (bool, error)
CreateQueueTable creates the DynamoDB table for the queue.
func (FifoQueueImpl) DefaultTTL ¶ added in v0.10.0
DefaultTTL returns the default TTL for the queue.
func (*FifoQueueImpl) DeleteMessages ¶ added in v0.10.0
func (fq *FifoQueueImpl) DeleteMessages( ctx context.Context, receiptHandles ...string, ) ([]string, error)
DeleteMessages deletes messages using their receipt handles.
func (*FifoQueueImpl) DropQueueTable ¶ added in v0.10.0
func (fq *FifoQueueImpl) DropQueueTable(ctx context.Context) error
DropQueueTable drops the DynamoDB table.
func (*FifoQueueImpl) List ¶ added in v0.10.0
func (fq *FifoQueueImpl) List(ctx context.Context) ([]QueueAndClientID, error)
List returns all queue/clientID combinations in the table.
func (FifoQueueImpl) Logging ¶ added in v0.10.0
func (bq FifoQueueImpl) Logging() bool
Logging returns true if logging is enabled.
func (FifoQueueImpl) PartitionKey ¶ added in v0.10.0
func (bq FifoQueueImpl) PartitionKey() string
PartitionKey returns the combined partition key (queueName|clientID).
func (*FifoQueueImpl) PollMessages ¶ added in v0.10.0
func (fq *FifoQueueImpl) PollMessages( ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int, ) ([]events.SQSMessage, error)
PollMessages polls messages from the queue using FIFO semantics. Only one message per message group can be in-flight at a time. To guarantee strict FIFO ordering under concurrent access, this implementation processes messages one at a time with verification.
func (*FifoQueueImpl) Purge ¶ added in v0.10.0
func (fq *FifoQueueImpl) Purge(ctx context.Context) error
Purge deletes all messages from the queue.
func (*FifoQueueImpl) PurgeAll ¶ added in v0.10.0
func (fq *FifoQueueImpl) PurgeAll(ctx context.Context) error
PurgeAll deletes all items from the DynamoDB table.
func (*FifoQueueImpl) PushMessages ¶ added in v0.10.0
func (fq *FifoQueueImpl) PushMessages( ctx context.Context, ttl time.Duration, messages ...events.SQSMessage, ) ([]events.SQSMessage, error)
PushMessages pushes messages to the default message group.
func (*FifoQueueImpl) PushMessagesWithGroup ¶ added in v0.10.0
func (fq *FifoQueueImpl) PushMessagesWithGroup( ctx context.Context, ttl time.Duration, messageGroup string, messages ...events.SQSMessage, ) ([]events.SQSMessage, error)
PushMessagesWithGroup pushes messages to a specific message group.
func (FifoQueueImpl) QueueName ¶ added in v0.10.0
func (bq FifoQueueImpl) QueueName() string
QueueName returns the queue name.
func (FifoQueueImpl) RandomDigits ¶ added in v0.10.0
func (bq FifoQueueImpl) RandomDigits() int
RandomDigits returns the number of random digits appended to SK.
func (FifoQueueImpl) SetLogging ¶ added in v0.10.0
func (bq FifoQueueImpl) SetLogging(enabled bool)
SetLogging enables or disables logging.
func (FifoQueueImpl) Table ¶ added in v0.10.0
func (bq FifoQueueImpl) Table() string
Table returns the DynamoDB table name.
func (*FifoQueueImpl) TableExists ¶ added in v0.10.0
func (fq *FifoQueueImpl) TableExists(ctx context.Context) bool
TableExists returns true if the table exists.
func (FifoQueueImpl) Type ¶ added in v0.10.0
func (bq FifoQueueImpl) Type() QueueType
Type returns the queue type.
func (*FifoQueueImpl) UseClientID ¶ added in v0.10.0
func (fq *FifoQueueImpl) UseClientID(clientID string) Queue
UseClientID sets the client ID. Validation occurs when queue operations are called.
func (*FifoQueueImpl) UseQueueName ¶ added in v0.10.0
func (fq *FifoQueueImpl) UseQueueName(queueName string) Queue
UseQueueName sets the queue name. Validation occurs when queue operations are called.
func (*FifoQueueImpl) UseSigningKey ¶ added in v0.11.0
func (fq *FifoQueueImpl) UseSigningKey(key []byte) Queue
UseSigningKey sets a persistent HMAC signing key for receipt handle verification. The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.
func (*FifoQueueImpl) UseTable ¶ added in v0.10.0
func (fq *FifoQueueImpl) UseTable(table string) Queue
UseTable sets the DynamoDB table name.
type Queue ¶ added in v0.10.0
type Queue interface {
// UseTable sets the DynamoDB table name for the queue.
UseTable(table string) Queue
// UseQueueName sets the queue name (part of the partition key).
UseQueueName(queueName string) Queue
// UseClientID sets the client ID (part of the partition key).
UseClientID(clientID string) Queue
// UseSigningKey sets a persistent HMAC signing key for receipt handle verification.
// This allows receipt handles to survive instance restarts and be verified across
// different queue instances sharing the same key.
// The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.
// If not called, a random key is generated per instance (default behavior).
UseSigningKey(key []byte) Queue
// SetLogging enables or disables logging for queue operations.
SetLogging(enabled bool)
// Logging returns true if logging is enabled.
Logging() bool
// Table returns the DynamoDB table name.
Table() string
// QueueName returns the queue name.
QueueName() string
// ClientID returns the client ID.
ClientID() string
// PartitionKey returns the combined partition key (queueName|clientID).
PartitionKey() string
// RandomDigits returns the number of random digits appended to SK.
RandomDigits() int
// PushMessages pushes one or more messages onto the queue.
// Maximum of 25 messages allowed per call.
// Returns any messages that failed to send along with an error.
PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
// PollMessages polls messages from the queue using SQS-compatible semantics.
// timeout: Maximum time to wait for minMessages.
// visibilityTimeout: How long messages are hidden after being returned.
// minMessages: Minimum messages to collect before returning.
// maxMessages: Maximum messages to return.
PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)
// DeleteMessages deletes messages using their receipt handles.
// Returns receipt handles that failed to delete.
DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
// Count returns the number of messages in the queue.
Count(ctx context.Context) (int32, error)
// Purge deletes all messages from the queue.
Purge(ctx context.Context) error
// PurgeAll deletes all items from the DynamoDB table.
PurgeAll(ctx context.Context) error
// List returns all queue/clientID combinations in the table.
List(ctx context.Context) ([]QueueAndClientID, error)
// CreateQueueTable creates the DynamoDB table for the queue.
// Returns true if created, false if it already exists.
CreateQueueTable(ctx context.Context) (bool, error)
// DropQueueTable drops the DynamoDB table.
DropQueueTable(ctx context.Context) error
// TableExists returns true if the table exists.
TableExists(ctx context.Context) bool
// Type returns the queue type (Standard or FIFO).
Type() QueueType
}
Queue is the common interface for all queue types. It provides methods for managing messages in a DynamoDB-backed queue.
func New ¶ added in v0.4.0
New creates a new Queue instance.
By default, it creates a StandardQueue. Pass QueueFIFO as the third argument to create a FifoQueue instead.
If ttl is zero, the default of 14 days is used.
Examples:
// Create a standard queue (default) queue := dynamodbqueue.New(cfg, 0) // Explicitly create a standard queue queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueStandard) // Create a FIFO queue queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueFIFO) fifoQueue := queue.(dynamodbqueue.FifoQueue) // Access FIFO-specific methods
type QueueAndClientID ¶ added in v0.10.2
QueueAndClientID is used when `List` operation returns queue/clientID combinations.
type StandardQueue ¶ added in v0.10.0
type StandardQueue struct {
// contains filtered or unexported fields
}
StandardQueue implements the Queue interface with best-effort FIFO ordering, high throughput, and at-least-once delivery semantics.
func NewStandardQueue ¶ added in v0.10.0
func NewStandardQueue(client *dynamodb.Client, ttl time.Duration) *StandardQueue
NewStandardQueue creates a new StandardQueue instance.
func (StandardQueue) ClientID ¶ added in v0.10.0
func (bq StandardQueue) ClientID() string
ClientID returns the client ID.
func (*StandardQueue) Count ¶ added in v0.10.0
func (sq *StandardQueue) Count(ctx context.Context) (int32, error)
Count returns the number of messages in the queue.
func (*StandardQueue) CreateQueueTable ¶ added in v0.10.0
func (sq *StandardQueue) CreateQueueTable(ctx context.Context) (bool, error)
CreateQueueTable creates the DynamoDB table for the queue.
func (StandardQueue) DefaultTTL ¶ added in v0.10.0
DefaultTTL returns the default TTL for the queue.
func (*StandardQueue) DeleteMessages ¶ added in v0.10.0
func (sq *StandardQueue) DeleteMessages( ctx context.Context, receiptHandles ...string, ) ([]string, error)
DeleteMessages deletes messages using their receipt handles.
func (*StandardQueue) DropQueueTable ¶ added in v0.10.0
func (sq *StandardQueue) DropQueueTable(ctx context.Context) error
DropQueueTable drops the DynamoDB table.
func (*StandardQueue) List ¶ added in v0.10.0
func (sq *StandardQueue) List(ctx context.Context) ([]QueueAndClientID, error)
List returns all queue/clientID combinations in the table.
func (StandardQueue) Logging ¶ added in v0.10.0
func (bq StandardQueue) Logging() bool
Logging returns true if logging is enabled.
func (StandardQueue) PartitionKey ¶ added in v0.10.0
func (bq StandardQueue) PartitionKey() string
PartitionKey returns the combined partition key (queueName|clientID).
func (*StandardQueue) PollMessages ¶ added in v0.10.0
func (sq *StandardQueue) PollMessages( ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int, ) ([]events.SQSMessage, error)
PollMessages polls messages from the queue using SQS-compatible semantics.
func (*StandardQueue) Purge ¶ added in v0.10.0
func (sq *StandardQueue) Purge(ctx context.Context) error
Purge deletes all messages from the queue.
func (*StandardQueue) PurgeAll ¶ added in v0.10.0
func (sq *StandardQueue) PurgeAll(ctx context.Context) error
PurgeAll deletes all items from the DynamoDB table.
func (*StandardQueue) PushMessages ¶ added in v0.10.0
func (sq *StandardQueue) PushMessages( ctx context.Context, ttl time.Duration, messages ...events.SQSMessage, ) ([]events.SQSMessage, error)
PushMessages pushes messages to the queue.
func (StandardQueue) QueueName ¶ added in v0.10.0
func (bq StandardQueue) QueueName() string
QueueName returns the queue name.
func (StandardQueue) RandomDigits ¶ added in v0.10.0
func (bq StandardQueue) RandomDigits() int
RandomDigits returns the number of random digits appended to SK.
func (StandardQueue) SetLogging ¶ added in v0.10.0
func (bq StandardQueue) SetLogging(enabled bool)
SetLogging enables or disables logging.
func (StandardQueue) Table ¶ added in v0.10.0
func (bq StandardQueue) Table() string
Table returns the DynamoDB table name.
func (*StandardQueue) TableExists ¶ added in v0.10.0
func (sq *StandardQueue) TableExists(ctx context.Context) bool
TableExists returns true if the table exists.
func (StandardQueue) Type ¶ added in v0.10.0
func (bq StandardQueue) Type() QueueType
Type returns the queue type.
func (*StandardQueue) UseClientID ¶ added in v0.10.0
func (sq *StandardQueue) UseClientID(clientID string) Queue
UseClientID sets the client ID. Validation occurs when queue operations are called.
func (*StandardQueue) UseQueueName ¶ added in v0.10.0
func (sq *StandardQueue) UseQueueName(queueName string) Queue
UseQueueName sets the queue name. Validation occurs when queue operations are called.
func (*StandardQueue) UseSigningKey ¶ added in v0.11.0
func (sq *StandardQueue) UseSigningKey(key []byte) Queue
UseSigningKey sets a persistent HMAC signing key for receipt handle verification. The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.
func (*StandardQueue) UseTable ¶ added in v0.10.0
func (sq *StandardQueue) UseTable(table string) Queue
UseTable sets the DynamoDB table name.