dynamodbqueue

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ColumnPK is the name of the partition key.
	ColumnPK = "PK"
	// ColumnSK is the name of the sort key.
	ColumnSK = "SK"
	// ColumnHiddenUntil is the name of the hidden_until attribute where the visibility timeout
	// while polling messages is stored and compared against.
	ColumnHiddenUntil = "hidden_until"
	// ColumnOwner is the name of the owner attribute where the clientID is stored.
	ColumnOwner = "owner"
	// ColumnTTL is the name of the TTL attribute where the time to live is stored.
	// This is used when the message is pushed to the queue and DynamoDB is configured to
	// automatically delete the message when the TTL is reached.
	ColumnTTL = "TTL"
	// ColumnEvent is the name of the event attribute where the `events.SQSMessage` is stored.
	ColumnEvent = "event"
	// ColumnMessageGroup is the name of the message_group attribute for FIFO queues.
	ColumnMessageGroup = "message_group"
)

DynamoDB column names.

View Source
const (
	AttrSentTimestamp = "SentTimestamp"
)

SQS message attribute names.

View Source
const DefaultMessageGroup = "default"

DefaultMessageGroup is used when no message group is specified for FIFO queues.

View Source
const MaxMessageBodySize = 256 * 1024

MaxMessageBodySize is the maximum allowed size for a message body (256KB). This is conservative compared to DynamoDB's 400KB item limit to leave room for message attributes and metadata.

View Source
const MinRandomDigits = 8

MinRandomDigits is the minimum number of random digits required for SK uniqueness. With 8 digits using 36 characters (a-z, 0-9), we have 36^8 = 2.8 trillion combinations, providing extremely low collision probability even at high write rates.

View Source
const MinSigningKeyLength = 32

MinSigningKeyLength is the minimum number of bytes required for the HMAC-SHA256 signing key. Keys shorter than the hash output (32 bytes / 256 bits) reduce HMAC security per RFC 2104.

View Source
const PartitionKeySeparator = "|"

Separator constants.

View Source
const RecipientHandleSeparator = "&"

Variables

View Source
var (
	ErrTableNameNotSet  = errors.New("table name not set")
	ErrQueueNameNotSet  = errors.New("queue name not set")
	ErrClientIDNotSet   = errors.New("client ID not set")
	ErrInvalidQueueName = errors.New("invalid queue name: must be 1-64 chars without | or & separators")
	ErrInvalidClientID  = errors.New("invalid client ID: must be 1-64 chars without | or & separators")
	ErrExpressionBuild  = errors.New("failed to build DynamoDB expression")
)

Sentinel errors for validation failures.

View Source
var (
	ErrTooManyMessages    = errors.New("maximum of 25 messages allowed")
	ErrMessageTooLarge    = errors.New("message body exceeds maximum size")
	ErrAttributeNotFound  = errors.New("attribute not found")
	ErrAttributeNotNumber = errors.New("attribute is not a number")
	ErrAttributeNotString = errors.New("attribute is not a string")
	ErrBatchWriteRetries  = errors.New("batch write failed after max retries")
	ErrSigningKeyTooShort = errors.New("signing key too short: minimum 32 bytes required for HMAC-SHA256")
)

Sentinel errors for message operations.

Functions

func RandomInt

func RandomInt(minVal, maxVal int) int

RandomInt generates a cryptographically secure random integer in the range [minVal, maxVal). If random number generation fails (extremely rare), returns minVal as a safe fallback rather than panicking. Library code should never panic in normal operation.

func RandomPostfix

func RandomPostfix(name string) string

RandomPostfix postfixes the name with a random string.

func RandomString

func RandomString(n int) string

RandomString generates a random alphanumeric string of length n.

func ToBatches

func ToBatches[T any](items []T, batchSize int) [][]T

ToBatches splits a slice into batches of the specified size.

func ToReceiptHandles

func ToReceiptHandles(msgs []events.SQSMessage) []string

ToReceiptHandles extracts the receipt handles from the SQS messages.

Types

type FifoQueue added in v0.10.0

type FifoQueue interface {
	Queue

	// PushMessagesWithGroup pushes messages to a specific message group.
	// Messages in the same group are delivered in strict FIFO order.
	// Only one message per group can be in-flight at a time.
	PushMessagesWithGroup(ctx context.Context, ttl time.Duration, messageGroup string, messages ...events.SQSMessage) ([]events.SQSMessage, error)
}

FifoQueue extends Queue with message group support for FIFO queues.

type FifoQueueImpl added in v0.10.0

type FifoQueueImpl struct {
	// contains filtered or unexported fields
}

FifoQueueImpl implements the FifoQueue interface with strict FIFO ordering per message group and exactly one message in-flight per group.

func NewFifoQueue added in v0.10.0

func NewFifoQueue(client *dynamodb.Client, ttl time.Duration) *FifoQueueImpl

NewFifoQueue creates a new FifoQueueImpl instance.

func (FifoQueueImpl) Client added in v0.10.0

func (bq FifoQueueImpl) Client() *dynamodb.Client

Client returns the DynamoDB client.

func (FifoQueueImpl) ClientID added in v0.10.0

func (bq FifoQueueImpl) ClientID() string

ClientID returns the client ID.

func (*FifoQueueImpl) Count added in v0.10.0

func (fq *FifoQueueImpl) Count(ctx context.Context) (int32, error)

Count returns the number of messages in the queue.

func (*FifoQueueImpl) CreateQueueTable added in v0.10.0

func (fq *FifoQueueImpl) CreateQueueTable(ctx context.Context) (bool, error)

CreateQueueTable creates the DynamoDB table for the queue.

func (FifoQueueImpl) DefaultTTL added in v0.10.0

func (bq FifoQueueImpl) DefaultTTL() time.Duration

DefaultTTL returns the default TTL for the queue.

func (*FifoQueueImpl) DeleteMessages added in v0.10.0

func (fq *FifoQueueImpl) DeleteMessages(
	ctx context.Context,
	receiptHandles ...string,
) ([]string, error)

DeleteMessages deletes messages using their receipt handles.

func (*FifoQueueImpl) DropQueueTable added in v0.10.0

func (fq *FifoQueueImpl) DropQueueTable(ctx context.Context) error

DropQueueTable drops the DynamoDB table.

func (*FifoQueueImpl) List added in v0.10.0

func (fq *FifoQueueImpl) List(ctx context.Context) ([]QueueAndClientID, error)

List returns all queue/clientID combinations in the table.

func (FifoQueueImpl) Logging added in v0.10.0

func (bq FifoQueueImpl) Logging() bool

Logging returns true if logging is enabled.

func (FifoQueueImpl) PartitionKey added in v0.10.0

func (bq FifoQueueImpl) PartitionKey() string

PartitionKey returns the combined partition key (queueName|clientID).

func (*FifoQueueImpl) PollMessages added in v0.10.0

func (fq *FifoQueueImpl) PollMessages(
	ctx context.Context,
	timeout, visibilityTimeout time.Duration,
	minMessages, maxMessages int,
) ([]events.SQSMessage, error)

PollMessages polls messages from the queue using FIFO semantics. Only one message per message group can be in-flight at a time. To guarantee strict FIFO ordering under concurrent access, this implementation processes messages one at a time with verification.

func (*FifoQueueImpl) Purge added in v0.10.0

func (fq *FifoQueueImpl) Purge(ctx context.Context) error

Purge deletes all messages from the queue.

func (*FifoQueueImpl) PurgeAll added in v0.10.0

func (fq *FifoQueueImpl) PurgeAll(ctx context.Context) error

PurgeAll deletes all items from the DynamoDB table.

func (*FifoQueueImpl) PushMessages added in v0.10.0

func (fq *FifoQueueImpl) PushMessages(
	ctx context.Context,
	ttl time.Duration,
	messages ...events.SQSMessage,
) ([]events.SQSMessage, error)

PushMessages pushes messages to the default message group.

func (*FifoQueueImpl) PushMessagesWithGroup added in v0.10.0

func (fq *FifoQueueImpl) PushMessagesWithGroup(
	ctx context.Context,
	ttl time.Duration,
	messageGroup string,
	messages ...events.SQSMessage,
) ([]events.SQSMessage, error)

PushMessagesWithGroup pushes messages to a specific message group.

func (FifoQueueImpl) QueueName added in v0.10.0

func (bq FifoQueueImpl) QueueName() string

QueueName returns the queue name.

func (FifoQueueImpl) RandomDigits added in v0.10.0

func (bq FifoQueueImpl) RandomDigits() int

RandomDigits returns the number of random digits appended to SK.

func (FifoQueueImpl) SetLogging added in v0.10.0

func (bq FifoQueueImpl) SetLogging(enabled bool)

SetLogging enables or disables logging.

func (FifoQueueImpl) Table added in v0.10.0

func (bq FifoQueueImpl) Table() string

Table returns the DynamoDB table name.

func (*FifoQueueImpl) TableExists added in v0.10.0

func (fq *FifoQueueImpl) TableExists(ctx context.Context) bool

TableExists returns true if the table exists.

func (FifoQueueImpl) Type added in v0.10.0

func (bq FifoQueueImpl) Type() QueueType

Type returns the queue type.

func (*FifoQueueImpl) UseClientID added in v0.10.0

func (fq *FifoQueueImpl) UseClientID(clientID string) Queue

UseClientID sets the client ID. Validation occurs when queue operations are called.

func (*FifoQueueImpl) UseQueueName added in v0.10.0

func (fq *FifoQueueImpl) UseQueueName(queueName string) Queue

UseQueueName sets the queue name. Validation occurs when queue operations are called.

func (*FifoQueueImpl) UseSigningKey added in v0.11.0

func (fq *FifoQueueImpl) UseSigningKey(key []byte) Queue

UseSigningKey sets a persistent HMAC signing key for receipt handle verification. The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.

func (*FifoQueueImpl) UseTable added in v0.10.0

func (fq *FifoQueueImpl) UseTable(table string) Queue

UseTable sets the DynamoDB table name.

type Queue added in v0.10.0

type Queue interface {
	// UseTable sets the DynamoDB table name for the queue.
	UseTable(table string) Queue

	// UseQueueName sets the queue name (part of the partition key).
	UseQueueName(queueName string) Queue

	// UseClientID sets the client ID (part of the partition key).
	UseClientID(clientID string) Queue

	// UseSigningKey sets a persistent HMAC signing key for receipt handle verification.
	// This allows receipt handles to survive instance restarts and be verified across
	// different queue instances sharing the same key.
	// The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.
	// If not called, a random key is generated per instance (default behavior).
	UseSigningKey(key []byte) Queue

	// SetLogging enables or disables logging for queue operations.
	SetLogging(enabled bool)

	// Logging returns true if logging is enabled.
	Logging() bool

	// Table returns the DynamoDB table name.
	Table() string

	// QueueName returns the queue name.
	QueueName() string

	// ClientID returns the client ID.
	ClientID() string

	// PartitionKey returns the combined partition key (queueName|clientID).
	PartitionKey() string

	// RandomDigits returns the number of random digits appended to SK.
	RandomDigits() int

	// PushMessages pushes one or more messages onto the queue.
	// Maximum of 25 messages allowed per call.
	// Returns any messages that failed to send along with an error.
	PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)

	// PollMessages polls messages from the queue using SQS-compatible semantics.
	// timeout: Maximum time to wait for minMessages.
	// visibilityTimeout: How long messages are hidden after being returned.
	// minMessages: Minimum messages to collect before returning.
	// maxMessages: Maximum messages to return.
	PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)

	// DeleteMessages deletes messages using their receipt handles.
	// Returns receipt handles that failed to delete.
	DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)

	// Count returns the number of messages in the queue.
	Count(ctx context.Context) (int32, error)

	// Purge deletes all messages from the queue.
	Purge(ctx context.Context) error

	// PurgeAll deletes all items from the DynamoDB table.
	PurgeAll(ctx context.Context) error

	// List returns all queue/clientID combinations in the table.
	List(ctx context.Context) ([]QueueAndClientID, error)

	// CreateQueueTable creates the DynamoDB table for the queue.
	// Returns true if created, false if it already exists.
	CreateQueueTable(ctx context.Context) (bool, error)

	// DropQueueTable drops the DynamoDB table.
	DropQueueTable(ctx context.Context) error

	// TableExists returns true if the table exists.
	TableExists(ctx context.Context) bool

	// Type returns the queue type (Standard or FIFO).
	Type() QueueType
}

Queue is the common interface for all queue types. It provides methods for managing messages in a DynamoDB-backed queue.

func New added in v0.4.0

func New(cfg aws.Config, ttl time.Duration, queueType ...QueueType) Queue

New creates a new Queue instance.

By default, it creates a StandardQueue. Pass QueueFIFO as the third argument to create a FifoQueue instead.

If ttl is zero, the default of 14 days is used.

Examples:

// Create a standard queue (default)
queue := dynamodbqueue.New(cfg, 0)

// Explicitly create a standard queue
queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueStandard)

// Create a FIFO queue
queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueFIFO)
fifoQueue := queue.(dynamodbqueue.FifoQueue) // Access FIFO-specific methods

func NewWithClient added in v0.10.0

func NewWithClient(client *dynamodb.Client, ttl time.Duration, queueType ...QueueType) Queue

NewWithClient creates a new Queue instance with an existing DynamoDB client.

This is useful for testing or when you need custom client configuration.

type QueueAndClientID added in v0.10.2

type QueueAndClientID struct {
	QueueName string `json:"qn"`
	ClientID  string `json:"cid"`
}

QueueAndClientID is used when `List` operation returns queue/clientID combinations.

type QueueType

type QueueType int

QueueType represents the type of queue to create.

const (
	// QueueStandard is the default queue type with best-effort FIFO ordering,
	// high throughput, and at-least-once delivery.
	QueueStandard QueueType = iota

	// QueueFIFO provides strict FIFO ordering per message group with exactly
	// one message in-flight per group at a time.
	QueueFIFO
)

type StandardQueue added in v0.10.0

type StandardQueue struct {
	// contains filtered or unexported fields
}

StandardQueue implements the Queue interface with best-effort FIFO ordering, high throughput, and at-least-once delivery semantics.

func NewStandardQueue added in v0.10.0

func NewStandardQueue(client *dynamodb.Client, ttl time.Duration) *StandardQueue

NewStandardQueue creates a new StandardQueue instance.

func (StandardQueue) Client added in v0.10.0

func (bq StandardQueue) Client() *dynamodb.Client

Client returns the DynamoDB client.

func (StandardQueue) ClientID added in v0.10.0

func (bq StandardQueue) ClientID() string

ClientID returns the client ID.

func (*StandardQueue) Count added in v0.10.0

func (sq *StandardQueue) Count(ctx context.Context) (int32, error)

Count returns the number of messages in the queue.

func (*StandardQueue) CreateQueueTable added in v0.10.0

func (sq *StandardQueue) CreateQueueTable(ctx context.Context) (bool, error)

CreateQueueTable creates the DynamoDB table for the queue.

func (StandardQueue) DefaultTTL added in v0.10.0

func (bq StandardQueue) DefaultTTL() time.Duration

DefaultTTL returns the default TTL for the queue.

func (*StandardQueue) DeleteMessages added in v0.10.0

func (sq *StandardQueue) DeleteMessages(
	ctx context.Context,
	receiptHandles ...string,
) ([]string, error)

DeleteMessages deletes messages using their receipt handles.

func (*StandardQueue) DropQueueTable added in v0.10.0

func (sq *StandardQueue) DropQueueTable(ctx context.Context) error

DropQueueTable drops the DynamoDB table.

func (*StandardQueue) List added in v0.10.0

func (sq *StandardQueue) List(ctx context.Context) ([]QueueAndClientID, error)

List returns all queue/clientID combinations in the table.

func (StandardQueue) Logging added in v0.10.0

func (bq StandardQueue) Logging() bool

Logging returns true if logging is enabled.

func (StandardQueue) PartitionKey added in v0.10.0

func (bq StandardQueue) PartitionKey() string

PartitionKey returns the combined partition key (queueName|clientID).

func (*StandardQueue) PollMessages added in v0.10.0

func (sq *StandardQueue) PollMessages(
	ctx context.Context,
	timeout, visibilityTimeout time.Duration,
	minMessages, maxMessages int,
) ([]events.SQSMessage, error)

PollMessages polls messages from the queue using SQS-compatible semantics.

func (*StandardQueue) Purge added in v0.10.0

func (sq *StandardQueue) Purge(ctx context.Context) error

Purge deletes all messages from the queue.

func (*StandardQueue) PurgeAll added in v0.10.0

func (sq *StandardQueue) PurgeAll(ctx context.Context) error

PurgeAll deletes all items from the DynamoDB table.

func (*StandardQueue) PushMessages added in v0.10.0

func (sq *StandardQueue) PushMessages(
	ctx context.Context,
	ttl time.Duration,
	messages ...events.SQSMessage,
) ([]events.SQSMessage, error)

PushMessages pushes messages to the queue.

func (StandardQueue) QueueName added in v0.10.0

func (bq StandardQueue) QueueName() string

QueueName returns the queue name.

func (StandardQueue) RandomDigits added in v0.10.0

func (bq StandardQueue) RandomDigits() int

RandomDigits returns the number of random digits appended to SK.

func (StandardQueue) SetLogging added in v0.10.0

func (bq StandardQueue) SetLogging(enabled bool)

SetLogging enables or disables logging.

func (StandardQueue) Table added in v0.10.0

func (bq StandardQueue) Table() string

Table returns the DynamoDB table name.

func (*StandardQueue) TableExists added in v0.10.0

func (sq *StandardQueue) TableExists(ctx context.Context) bool

TableExists returns true if the table exists.

func (StandardQueue) Type added in v0.10.0

func (bq StandardQueue) Type() QueueType

Type returns the queue type.

func (*StandardQueue) UseClientID added in v0.10.0

func (sq *StandardQueue) UseClientID(clientID string) Queue

UseClientID sets the client ID. Validation occurs when queue operations are called.

func (*StandardQueue) UseQueueName added in v0.10.0

func (sq *StandardQueue) UseQueueName(queueName string) Queue

UseQueueName sets the queue name. Validation occurs when queue operations are called.

func (*StandardQueue) UseSigningKey added in v0.11.0

func (sq *StandardQueue) UseSigningKey(key []byte) Queue

UseSigningKey sets a persistent HMAC signing key for receipt handle verification. The key must be at least MinSigningKeyLength (32) bytes; validation occurs at operation time.

func (*StandardQueue) UseTable added in v0.10.0

func (sq *StandardQueue) UseTable(table string) Queue

UseTable sets the DynamoDB table name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL