testutil

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CloseAllInstances

func CloseAllInstances(ctx context.Context) error

CloseAllInstances stops and removes all tagged local instances.

func ListAllInstances

func ListAllInstances(ctx context.Context) ([]string, error)

ListAllInstances returns the names of all containers tagged as godynamodb-queue DynamoDB local.

func ParseGroupFromBody added in v0.10.2

func ParseGroupFromBody(body string) string

ParseGroupFromBody extracts the group name from a message body. Supports two formats:

  • "group-name-msg-N" -> returns "group-name"
  • "group-name|N" -> returns "group-name"

Types

type DynamoDBLocal

type DynamoDBLocal struct {
	Name  string
	Image string

	Port     int
	InMemory bool
	SharedDB bool
	// contains filtered or unexported fields
}

DynamoDBLocal controls a dockerized DynamoDB Local instance. It is intended for use in integration tests.

func NewLocalDynamoDB

func NewLocalDynamoDB(name string, opts ...LocalDynamoOption) *DynamoDBLocal

NewLocalDynamoDB creates a new, not-yet-started DynamoDBLocal runner.

func (*DynamoDBLocal) AWSConfig

func (d *DynamoDBLocal) AWSConfig() aws.Config

AWSConfig returns an AWS SDK config for the local DynamoDB instance. Note: Use DynamoDBClient() instead, which properly configures the endpoint.

func (*DynamoDBLocal) Close

func (d *DynamoDBLocal) Close() error

Close removes the container (force).

func (*DynamoDBLocal) CreateTable

func (d *DynamoDBLocal) CreateTable(
	ctx context.Context,
	tableName string,
) error

CreateTable creates a DynamoDB table for the queue.

func (*DynamoDBLocal) DeleteTable

func (d *DynamoDBLocal) DeleteTable(ctx context.Context, tableName string) error

DeleteTable deletes a DynamoDB table.

func (*DynamoDBLocal) DynamoDBClient

func (d *DynamoDBLocal) DynamoDBClient(opts ...func(*dynamodb.Options)) *dynamodb.Client

DynamoDBClient returns a DynamoDB client configured to connect to the local instance.

func (*DynamoDBLocal) EndpointURL

func (d *DynamoDBLocal) EndpointURL() string

EndpointURL returns the HTTP endpoint URL for this local instance.

func (*DynamoDBLocal) Start

func (d *DynamoDBLocal) Start(ctx context.Context) error

Start launches the dockerized DynamoDB Local and waits for it to accept connections.

func (*DynamoDBLocal) Stop

func (d *DynamoDBLocal) Stop() error

Stop stops the container (but leaves it around because of --rm it will remove).

func (*DynamoDBLocal) WaitForReady

func (d *DynamoDBLocal) WaitForReady(ctx context.Context, timeout time.Duration) error

WaitForReady waits for DynamoDB Local to be ready to accept requests. It uses the ListTables API to verify the database is operational.

type FIFOTestHelper added in v0.10.2

type FIFOTestHelper struct {
	Queue FIFOTestQueue

	VisibilityTimeout time.Duration
	PollTimeout       time.Duration

	Format MessageBodyFormat
	// contains filtered or unexported fields
}

FIFOTestHelper provides utilities for testing FIFO queues.

func NewFIFOTestHelper added in v0.10.2

func NewFIFOTestHelper(queue FIFOTestQueue) *FIFOTestHelper

NewFIFOTestHelper creates a new FIFO test helper.

func (*FIFOTestHelper) AssertQueueEmpty added in v0.10.2

func (h *FIFOTestHelper) AssertQueueEmpty(ctx context.Context) error

AssertQueueEmpty verifies the queue has no messages.

func (*FIFOTestHelper) ConsumeAll added in v0.10.2

func (h *FIFOTestHelper) ConsumeAll(ctx context.Context, maxMessages int) (int, error)

ConsumeAll consumes all messages from the queue and tracks ordering. Returns the total number of messages consumed.

func (*FIFOTestHelper) ConsumeAllConcurrently added in v0.10.2

func (h *FIFOTestHelper) ConsumeAllConcurrently(ctx context.Context, numConsumers, totalMessages int, timeout time.Duration) (int, error)

ConsumeAllConcurrently consumes messages using multiple concurrent consumers.

func (*FIFOTestHelper) ConsumeWithCallback added in v0.10.2

func (h *FIFOTestHelper) ConsumeWithCallback(ctx context.Context, maxMessages int, callback func(msg events.SQSMessage) bool) (int, error)

ConsumeWithCallback consumes messages and calls the callback for each. If callback returns false, the message is NOT deleted (simulating processing failure).

func (*FIFOTestHelper) Duplicates added in v0.10.2

func (h *FIFOTestHelper) Duplicates() map[string]int

Duplicates returns message IDs that were received more than once.

func (*FIFOTestHelper) GroupMessages added in v0.10.2

func (h *FIFOTestHelper) GroupMessages() map[string][]int

GroupMessages returns the messages received per group (in order).

func (*FIFOTestHelper) PushMessages added in v0.10.2

func (h *FIFOTestHelper) PushMessages(ctx context.Context, group string, count int) error

PushMessages pushes messages to a group with the configured format.

func (*FIFOTestHelper) PushMessagesWithDelay added in v0.10.2

func (h *FIFOTestHelper) PushMessagesWithDelay(ctx context.Context, group string, count int, delay time.Duration) error

PushMessagesWithDelay pushes messages to a group with a delay between each.

func (*FIFOTestHelper) PushToGroups added in v0.10.2

func (h *FIFOTestHelper) PushToGroups(ctx context.Context, groups []string, messagesPerGroup int, delay time.Duration) error

PushToGroups pushes messages to multiple groups.

func (*FIFOTestHelper) Reset added in v0.10.2

func (h *FIFOTestHelper) Reset()

Reset clears all tracking state.

func (*FIFOTestHelper) TotalReceived added in v0.10.2

func (h *FIFOTestHelper) TotalReceived() int

TotalReceived returns the total number of messages received.

func (*FIFOTestHelper) VerifyComplete added in v0.10.2

func (h *FIFOTestHelper) VerifyComplete(groups []string, messagesPerGroup int) error

VerifyComplete checks that all expected messages were received in order.

func (*FIFOTestHelper) VerifyNoDuplicates added in v0.10.2

func (h *FIFOTestHelper) VerifyNoDuplicates() error

VerifyNoDuplicates checks that no messages were received more than once.

func (*FIFOTestHelper) VerifyOrdering added in v0.10.2

func (h *FIFOTestHelper) VerifyOrdering() error

VerifyOrdering checks that messages within each group are in order.

func (*FIFOTestHelper) Violations added in v0.10.2

func (h *FIFOTestHelper) Violations() int

Violations returns the number of ordering violations detected.

func (*FIFOTestHelper) WaitForVisibilityExpiry added in v0.10.2

func (h *FIFOTestHelper) WaitForVisibilityExpiry(ctx context.Context, expectedCount int, maxWait time.Duration) ([]events.SQSMessage, error)

WaitForVisibilityExpiry waits for messages to become visible again using retry polling.

func (*FIFOTestHelper) WithFormat added in v0.10.2

func (h *FIFOTestHelper) WithFormat(format MessageBodyFormat) *FIFOTestHelper

WithFormat sets the message body format.

func (*FIFOTestHelper) WithPollTimeout added in v0.10.2

func (h *FIFOTestHelper) WithPollTimeout(d time.Duration) *FIFOTestHelper

WithPollTimeout sets the poll timeout.

func (*FIFOTestHelper) WithVisibilityTimeout added in v0.10.2

func (h *FIFOTestHelper) WithVisibilityTimeout(d time.Duration) *FIFOTestHelper

WithVisibilityTimeout sets the visibility timeout for polling.

type FIFOTestQueue added in v0.10.2

type FIFOTestQueue interface {
	PushMessagesWithGroup(ctx context.Context, ttl time.Duration, messageGroup string, messages ...events.SQSMessage) ([]events.SQSMessage, error)
	PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)
	DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
	Count(ctx context.Context) (int32, error)
	Purge(ctx context.Context) error
}

FIFOTestQueue is an interface that matches the FIFO queue operations needed for testing.

type LocalDynamoOption

type LocalDynamoOption func(*DynamoDBLocal)

LocalDynamoOption allows customizing the local DynamoDB runner.

func WithImage

func WithImage(img string) LocalDynamoOption

WithImage overrides the docker image (defaults to amazon/dynamodb-local:latest).

func WithInMemory

func WithInMemory(enabled bool) LocalDynamoOption

WithInMemory toggles the -inMemory flag.

func WithPort

func WithPort(port int) LocalDynamoOption

WithPort sets the host port to map to container.

func WithSharedDB

func WithSharedDB(enabled bool) LocalDynamoOption

WithSharedDB toggles the -sharedDb flag.

type MessageBodyFormat added in v0.10.2

type MessageBodyFormat int

MessageBodyFormat defines how message bodies are formatted for testing.

const (
	// FormatGroupMsgNum formats as "group-name|msgNum" (e.g., "group-A|0")
	FormatGroupMsgNum MessageBodyFormat = iota
	// FormatGroupDashMsgNum formats as "group-name-msg-msgNum" (e.g., "group-A-msg-0")
	FormatGroupDashMsgNum
)

type StandardTestHelper added in v0.10.2

type StandardTestHelper struct {
	Queue             StandardTestQueue
	VisibilityTimeout time.Duration
	PollTimeout       time.Duration
}

StandardTestHelper provides utilities for testing standard (non-FIFO) queues.

func NewStandardTestHelper added in v0.10.2

func NewStandardTestHelper(queue StandardTestQueue) *StandardTestHelper

NewStandardTestHelper creates a new standard queue test helper.

func (*StandardTestHelper) AssertQueueCount added in v0.10.2

func (h *StandardTestHelper) AssertQueueCount(ctx context.Context, expected int32) error

AssertQueueCount verifies the queue has the expected number of messages.

func (*StandardTestHelper) AssertQueueEmpty added in v0.10.2

func (h *StandardTestHelper) AssertQueueEmpty(ctx context.Context) error

AssertQueueEmpty verifies the queue has no messages.

func (*StandardTestHelper) ConsumeAll added in v0.10.2

func (h *StandardTestHelper) ConsumeAll(ctx context.Context, maxMessages int) ([]events.SQSMessage, error)

ConsumeAll consumes all available messages from the queue. Returns the messages consumed and any error encountered.

func (*StandardTestHelper) ProcessWithRetry added in v0.10.2

func (h *StandardTestHelper) ProcessWithRetry(
	ctx context.Context,
	maxMessages int,
	visibilityTimeout time.Duration,
	failFunc func(msg events.SQSMessage) bool,
) (processed, failed []events.SQSMessage, err error)

ProcessWithRetry processes messages, simulating failures that trigger redelivery. failFunc determines whether a message should "fail" (not be deleted). Returns successfully processed messages, failed messages that need redelivery, and any error.

func (*StandardTestHelper) WaitForMessages added in v0.10.2

func (h *StandardTestHelper) WaitForMessages(
	ctx context.Context,
	minCount int,
	maxWait time.Duration,
	visibilityTimeout time.Duration,
) ([]events.SQSMessage, error)

WaitForMessages waits for at least minCount messages to be available in the queue. This is useful for waiting for visibility timeouts without specifying exact counts.

func (*StandardTestHelper) WaitForVisibilityExpiry added in v0.10.2

func (h *StandardTestHelper) WaitForVisibilityExpiry(
	ctx context.Context,
	expectedCount int,
	maxWait time.Duration,
) ([]events.SQSMessage, error)

WaitForVisibilityExpiry waits for messages to become visible again using retry polling. This replaces hardcoded time.Sleep calls with a more reliable retry-based approach. Returns the messages that became visible, or an error if timeout is exceeded.

func (*StandardTestHelper) WithPollTimeout added in v0.10.2

func (h *StandardTestHelper) WithPollTimeout(d time.Duration) *StandardTestHelper

WithPollTimeout sets the default poll timeout.

func (*StandardTestHelper) WithVisibilityTimeout added in v0.10.2

func (h *StandardTestHelper) WithVisibilityTimeout(d time.Duration) *StandardTestHelper

WithVisibilityTimeout sets the default visibility timeout for polling.

type StandardTestQueue added in v0.10.2

type StandardTestQueue interface {
	PushMessages(ctx context.Context, ttl time.Duration, messages ...events.SQSMessage) ([]events.SQSMessage, error)
	PollMessages(ctx context.Context, timeout, visibilityTimeout time.Duration, minMessages, maxMessages int) ([]events.SQSMessage, error)
	DeleteMessages(ctx context.Context, receiptHandles ...string) ([]string, error)
	Count(ctx context.Context) (int32, error)
	Purge(ctx context.Context) error
}

StandardTestQueue is an interface that matches the standard queue operations needed for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL