Documentation
¶
Index ¶
- func CloseAllInstances(ctx context.Context) error
- func ListAllInstances(ctx context.Context) ([]string, error)
- func ParseGroupFromBody(body string) string
- type DynamoDBLocal
- func (d *DynamoDBLocal) AWSConfig() aws.Config
- func (d *DynamoDBLocal) Close() error
- func (d *DynamoDBLocal) CreateTable(ctx context.Context, tableName string) error
- func (d *DynamoDBLocal) DeleteTable(ctx context.Context, tableName string) error
- func (d *DynamoDBLocal) DynamoDBClient(opts ...func(*dynamodb.Options)) *dynamodb.Client
- func (d *DynamoDBLocal) EndpointURL() string
- func (d *DynamoDBLocal) Start(ctx context.Context) error
- func (d *DynamoDBLocal) Stop() error
- func (d *DynamoDBLocal) WaitForReady(ctx context.Context, timeout time.Duration) error
- type FIFOTestHelper
- func (h *FIFOTestHelper) AssertQueueEmpty(ctx context.Context) error
- func (h *FIFOTestHelper) ConsumeAll(ctx context.Context, maxMessages int) (int, error)
- func (h *FIFOTestHelper) ConsumeAllConcurrently(ctx context.Context, numConsumers, totalMessages int, timeout time.Duration) (int, error)
- func (h *FIFOTestHelper) ConsumeWithCallback(ctx context.Context, maxMessages int, ...) (int, error)
- func (h *FIFOTestHelper) Duplicates() map[string]int
- func (h *FIFOTestHelper) GroupMessages() map[string][]int
- func (h *FIFOTestHelper) PushMessages(ctx context.Context, group string, count int) error
- func (h *FIFOTestHelper) PushMessagesWithDelay(ctx context.Context, group string, count int, delay time.Duration) error
- func (h *FIFOTestHelper) PushToGroups(ctx context.Context, groups []string, messagesPerGroup int, ...) error
- func (h *FIFOTestHelper) Reset()
- func (h *FIFOTestHelper) TotalReceived() int
- func (h *FIFOTestHelper) VerifyComplete(groups []string, messagesPerGroup int) error
- func (h *FIFOTestHelper) VerifyNoDuplicates() error
- func (h *FIFOTestHelper) VerifyOrdering() error
- func (h *FIFOTestHelper) Violations() int
- func (h *FIFOTestHelper) WaitForVisibilityExpiry(ctx context.Context, expectedCount int, maxWait time.Duration) ([]events.SQSMessage, error)
- func (h *FIFOTestHelper) WithFormat(format MessageBodyFormat) *FIFOTestHelper
- func (h *FIFOTestHelper) WithPollTimeout(d time.Duration) *FIFOTestHelper
- func (h *FIFOTestHelper) WithVisibilityTimeout(d time.Duration) *FIFOTestHelper
- type FIFOTestQueue
- type LocalDynamoOption
- type MessageBodyFormat
- type StandardTestHelper
- func (h *StandardTestHelper) AssertQueueCount(ctx context.Context, expected int32) error
- func (h *StandardTestHelper) AssertQueueEmpty(ctx context.Context) error
- func (h *StandardTestHelper) ConsumeAll(ctx context.Context, maxMessages int) ([]events.SQSMessage, error)
- func (h *StandardTestHelper) ProcessWithRetry(ctx context.Context, maxMessages int, visibilityTimeout time.Duration, ...) (processed, failed []events.SQSMessage, err error)
- func (h *StandardTestHelper) WaitForMessages(ctx context.Context, minCount int, maxWait time.Duration, ...) ([]events.SQSMessage, error)
- func (h *StandardTestHelper) WaitForVisibilityExpiry(ctx context.Context, expectedCount int, maxWait time.Duration) ([]events.SQSMessage, error)
- func (h *StandardTestHelper) WithPollTimeout(d time.Duration) *StandardTestHelper
- func (h *StandardTestHelper) WithVisibilityTimeout(d time.Duration) *StandardTestHelper
- type StandardTestQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CloseAllInstances ¶
CloseAllInstances stops and removes all tagged local instances.
func ListAllInstances ¶
ListAllInstances returns the names of all containers tagged as godynamodb-queue DynamoDB local.
func ParseGroupFromBody ¶ added in v0.10.2
ParseGroupFromBody extracts the group name from a message body. Supports two formats:
- "group-name-msg-N" -> returns "group-name"
- "group-name|N" -> returns "group-name"
Types ¶
type DynamoDBLocal ¶
type DynamoDBLocal struct {
Name string
Image string
Port int
InMemory bool
// contains filtered or unexported fields
}
DynamoDBLocal controls a dockerized DynamoDB Local instance. It is intended for use in integration tests.
func NewLocalDynamoDB ¶
func NewLocalDynamoDB(name string, opts ...LocalDynamoOption) *DynamoDBLocal
NewLocalDynamoDB creates a new, not-yet-started DynamoDBLocal runner.
func (*DynamoDBLocal) AWSConfig ¶
func (d *DynamoDBLocal) AWSConfig() aws.Config
AWSConfig returns an AWS SDK config for the local DynamoDB instance. Note: Use DynamoDBClient() instead, which properly configures the endpoint.
func (*DynamoDBLocal) Close ¶
func (d *DynamoDBLocal) Close() error
Close removes the container (force).
func (*DynamoDBLocal) CreateTable ¶
func (d *DynamoDBLocal) CreateTable( ctx context.Context, tableName string, ) error
CreateTable creates a DynamoDB table for the queue.
func (*DynamoDBLocal) DeleteTable ¶
func (d *DynamoDBLocal) DeleteTable(ctx context.Context, tableName string) error
DeleteTable deletes a DynamoDB table.
func (*DynamoDBLocal) DynamoDBClient ¶
func (d *DynamoDBLocal) DynamoDBClient(opts ...func(*dynamodb.Options)) *dynamodb.Client
DynamoDBClient returns a DynamoDB client configured to connect to the local instance.
func (*DynamoDBLocal) EndpointURL ¶
func (d *DynamoDBLocal) EndpointURL() string
EndpointURL returns the HTTP endpoint URL for this local instance.
func (*DynamoDBLocal) Start ¶
func (d *DynamoDBLocal) Start(ctx context.Context) error
Start launches the dockerized DynamoDB Local and waits for it to accept connections.
func (*DynamoDBLocal) Stop ¶
func (d *DynamoDBLocal) Stop() error
Stop stops the container (but leaves it around because of --rm it will remove).
func (*DynamoDBLocal) WaitForReady ¶
WaitForReady waits for DynamoDB Local to be ready to accept requests. It uses the ListTables API to verify the database is operational.
type FIFOTestHelper ¶ added in v0.10.2
type FIFOTestHelper struct {
Queue FIFOTestQueue
VisibilityTimeout time.Duration
PollTimeout time.Duration
Format MessageBodyFormat
// contains filtered or unexported fields
}
FIFOTestHelper provides utilities for testing FIFO queues.
func NewFIFOTestHelper ¶ added in v0.10.2
func NewFIFOTestHelper(queue FIFOTestQueue) *FIFOTestHelper
NewFIFOTestHelper creates a new FIFO test helper.
func (*FIFOTestHelper) AssertQueueEmpty ¶ added in v0.10.2
func (h *FIFOTestHelper) AssertQueueEmpty(ctx context.Context) error
AssertQueueEmpty verifies the queue has no messages.
func (*FIFOTestHelper) ConsumeAll ¶ added in v0.10.2
ConsumeAll consumes all messages from the queue and tracks ordering. Returns the total number of messages consumed.
func (*FIFOTestHelper) ConsumeAllConcurrently ¶ added in v0.10.2
func (h *FIFOTestHelper) ConsumeAllConcurrently(ctx context.Context, numConsumers, totalMessages int, timeout time.Duration) (int, error)
ConsumeAllConcurrently consumes messages using multiple concurrent consumers.
func (*FIFOTestHelper) ConsumeWithCallback ¶ added in v0.10.2
func (h *FIFOTestHelper) ConsumeWithCallback(ctx context.Context, maxMessages int, callback func(msg events.SQSMessage) bool) (int, error)
ConsumeWithCallback consumes messages and calls the callback for each. If callback returns false, the message is NOT deleted (simulating processing failure).
func (*FIFOTestHelper) Duplicates ¶ added in v0.10.2
func (h *FIFOTestHelper) Duplicates() map[string]int
Duplicates returns message IDs that were received more than once.
func (*FIFOTestHelper) GroupMessages ¶ added in v0.10.2
func (h *FIFOTestHelper) GroupMessages() map[string][]int
GroupMessages returns the messages received per group (in order).
func (*FIFOTestHelper) PushMessages ¶ added in v0.10.2
PushMessages pushes messages to a group with the configured format.
func (*FIFOTestHelper) PushMessagesWithDelay ¶ added in v0.10.2
func (h *FIFOTestHelper) PushMessagesWithDelay(ctx context.Context, group string, count int, delay time.Duration) error
PushMessagesWithDelay pushes messages to a group with a delay between each.
func (*FIFOTestHelper) PushToGroups ¶ added in v0.10.2
func (h *FIFOTestHelper) PushToGroups(ctx context.Context, groups []string, messagesPerGroup int, delay time.Duration) error
PushToGroups pushes messages to multiple groups.
func (*FIFOTestHelper) Reset ¶ added in v0.10.2
func (h *FIFOTestHelper) Reset()
Reset clears all tracking state.
func (*FIFOTestHelper) TotalReceived ¶ added in v0.10.2
func (h *FIFOTestHelper) TotalReceived() int
TotalReceived returns the total number of messages received.
func (*FIFOTestHelper) VerifyComplete ¶ added in v0.10.2
func (h *FIFOTestHelper) VerifyComplete(groups []string, messagesPerGroup int) error
VerifyComplete checks that all expected messages were received in order.
func (*FIFOTestHelper) VerifyNoDuplicates ¶ added in v0.10.2
func (h *FIFOTestHelper) VerifyNoDuplicates() error
VerifyNoDuplicates checks that no messages were received more than once.
func (*FIFOTestHelper) VerifyOrdering ¶ added in v0.10.2
func (h *FIFOTestHelper) VerifyOrdering() error
VerifyOrdering checks that messages within each group are in order.
func (*FIFOTestHelper) Violations ¶ added in v0.10.2
func (h *FIFOTestHelper) Violations() int
Violations returns the number of ordering violations detected.
func (*FIFOTestHelper) WaitForVisibilityExpiry ¶ added in v0.10.2
func (h *FIFOTestHelper) WaitForVisibilityExpiry(ctx context.Context, expectedCount int, maxWait time.Duration) ([]events.SQSMessage, error)
WaitForVisibilityExpiry waits for messages to become visible again using retry polling.
func (*FIFOTestHelper) WithFormat ¶ added in v0.10.2
func (h *FIFOTestHelper) WithFormat(format MessageBodyFormat) *FIFOTestHelper
WithFormat sets the message body format.
func (*FIFOTestHelper) WithPollTimeout ¶ added in v0.10.2
func (h *FIFOTestHelper) WithPollTimeout(d time.Duration) *FIFOTestHelper
WithPollTimeout sets the poll timeout.
func (*FIFOTestHelper) WithVisibilityTimeout ¶ added in v0.10.2
func (h *FIFOTestHelper) WithVisibilityTimeout(d time.Duration) *FIFOTestHelper
WithVisibilityTimeout sets the visibility timeout for polling.
type FIFOTestQueue ¶ added in v0.10.2
type FIFOTestQueue interface {
PushMessagesWithGroup(ctx context.Context, ttl time.Duration, messageGroup string, messages ...events.SQSMessage) ([]events.SQSMessage, error)
PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)
DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
Count(ctx context.Context) (int32, error)
Purge(ctx context.Context) error
}
FIFOTestQueue is an interface that matches the FIFO queue operations needed for testing.
type LocalDynamoOption ¶
type LocalDynamoOption func(*DynamoDBLocal)
LocalDynamoOption allows customizing the local DynamoDB runner.
func WithImage ¶
func WithImage(img string) LocalDynamoOption
WithImage overrides the docker image (defaults to amazon/dynamodb-local:latest).
func WithInMemory ¶
func WithInMemory(enabled bool) LocalDynamoOption
WithInMemory toggles the -inMemory flag.
func WithPort ¶
func WithPort(port int) LocalDynamoOption
WithPort sets the host port to map to container.
func WithSharedDB ¶
func WithSharedDB(enabled bool) LocalDynamoOption
WithSharedDB toggles the -sharedDb flag.
type MessageBodyFormat ¶ added in v0.10.2
type MessageBodyFormat int
MessageBodyFormat defines how message bodies are formatted for testing.
const ( // FormatGroupMsgNum formats as "group-name|msgNum" (e.g., "group-A|0") FormatGroupMsgNum MessageBodyFormat = iota // FormatGroupDashMsgNum formats as "group-name-msg-msgNum" (e.g., "group-A-msg-0") FormatGroupDashMsgNum )
type StandardTestHelper ¶ added in v0.10.2
type StandardTestHelper struct {
Queue StandardTestQueue
VisibilityTimeout time.Duration
PollTimeout time.Duration
}
StandardTestHelper provides utilities for testing standard (non-FIFO) queues.
func NewStandardTestHelper ¶ added in v0.10.2
func NewStandardTestHelper(queue StandardTestQueue) *StandardTestHelper
NewStandardTestHelper creates a new standard queue test helper.
func (*StandardTestHelper) AssertQueueCount ¶ added in v0.10.2
func (h *StandardTestHelper) AssertQueueCount(ctx context.Context, expected int32) error
AssertQueueCount verifies the queue has the expected number of messages.
func (*StandardTestHelper) AssertQueueEmpty ¶ added in v0.10.2
func (h *StandardTestHelper) AssertQueueEmpty(ctx context.Context) error
AssertQueueEmpty verifies the queue has no messages.
func (*StandardTestHelper) ConsumeAll ¶ added in v0.10.2
func (h *StandardTestHelper) ConsumeAll(ctx context.Context, maxMessages int) ([]events.SQSMessage, error)
ConsumeAll consumes all available messages from the queue. Returns the messages consumed and any error encountered.
func (*StandardTestHelper) ProcessWithRetry ¶ added in v0.10.2
func (h *StandardTestHelper) ProcessWithRetry( ctx context.Context, maxMessages int, visibilityTimeout time.Duration, failFunc func(msg events.SQSMessage) bool, ) (processed, failed []events.SQSMessage, err error)
ProcessWithRetry processes messages, simulating failures that trigger redelivery. failFunc determines whether a message should "fail" (not be deleted). Returns successfully processed messages, failed messages that need redelivery, and any error.
func (*StandardTestHelper) WaitForMessages ¶ added in v0.10.2
func (h *StandardTestHelper) WaitForMessages( ctx context.Context, minCount int, maxWait time.Duration, visibilityTimeout time.Duration, ) ([]events.SQSMessage, error)
WaitForMessages waits for at least minCount messages to be available in the queue. This is useful for waiting for visibility timeouts without specifying exact counts.
func (*StandardTestHelper) WaitForVisibilityExpiry ¶ added in v0.10.2
func (h *StandardTestHelper) WaitForVisibilityExpiry( ctx context.Context, expectedCount int, maxWait time.Duration, ) ([]events.SQSMessage, error)
WaitForVisibilityExpiry waits for messages to become visible again using retry polling. This replaces hardcoded time.Sleep calls with a more reliable retry-based approach. Returns the messages that became visible, or an error if timeout is exceeded.
func (*StandardTestHelper) WithPollTimeout ¶ added in v0.10.2
func (h *StandardTestHelper) WithPollTimeout(d time.Duration) *StandardTestHelper
WithPollTimeout sets the default poll timeout.
func (*StandardTestHelper) WithVisibilityTimeout ¶ added in v0.10.2
func (h *StandardTestHelper) WithVisibilityTimeout(d time.Duration) *StandardTestHelper
WithVisibilityTimeout sets the default visibility timeout for polling.
type StandardTestQueue ¶ added in v0.10.2
type StandardTestQueue interface {
PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)
DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
Count(ctx context.Context) (int32, error)
Purge(ctx context.Context) error
}
StandardTestQueue is an interface that matches the standard queue operations needed for testing.