helpers

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AssertDLQMessageCount

func AssertDLQMessageCount(t *testing.T, dlqMessages []*message_pb.Message, expectedCount int)

AssertDLQMessageCount verifies the number of messages in DLQ

func AssertErrorContains

func AssertErrorContains(t *testing.T, err error, expectedSubstring string)

AssertErrorContains verifies that an error contains a specific substring

func AssertLeaseActive

func AssertLeaseActive(t *testing.T, msg *message_pb.Message)

AssertLeaseActive verifies that a message lease is active

func AssertLeaseExpired

func AssertLeaseExpired(t *testing.T, msg *message_pb.Message)

AssertLeaseExpired verifies that a message lease has expired

func AssertMessageContent

func AssertMessageContent(t *testing.T, msg *message_pb.Message, expectedContent string)

AssertMessageContent verifies message payload content

func AssertMessageContentType

func AssertMessageContentType(t *testing.T, msg *message_pb.Message, expectedContentType string)

AssertMessageContentType verifies content type

func AssertMessagePriority

func AssertMessagePriority(t *testing.T, messages []*message_pb.Message, expectedOrder []int64)

AssertMessagePriority verifies that messages are ordered by priority

func AssertMessageState

func AssertMessageState(t *testing.T, msg *message_pb.Message, expectedState message_pb.Message_Metadata_State)

AssertMessageState verifies that a message is in the expected state

func AssertNoError

func AssertNoError(t *testing.T, err error, operation string)

AssertNoError is a convenience wrapper around require.NoError with custom message

func AssertQueueExists

func AssertQueueExists(t *testing.T, queues []*queue_pb.Queue, queueName string)

AssertQueueExists verifies that a queue with the given name exists

func AssertQueueNotExists

func AssertQueueNotExists(t *testing.T, queues []*queue_pb.Queue, queueName string)

AssertQueueNotExists verifies that a queue with the given name does not exist

func AssertQueueStateApproximate

func AssertQueueStateApproximate(t *testing.T, pending, running, completed int64,
	expectedPending, expectedRunning, expectedCompleted int64, tolerance int64,
)

AssertQueueStateApproximate verifies queue statistics (allowing for timing variations)

func AssertRetryAttemptsLeft

func AssertRetryAttemptsLeft(t *testing.T, msg *message_pb.Message, expectedAttempts int32)

AssertRetryAttemptsLeft verifies message retry attempts left

func AssertScheduleExecutionCount

func AssertScheduleExecutionCount(t *testing.T, executionCount int, expectedCount int)

AssertScheduleExecutionCount verifies the number of schedule executions

func AssertSuccess

func AssertSuccess(t *testing.T, success bool, operation string)

AssertSuccess verifies that a response indicates success

func AssertTimeWithinRange

func AssertTimeWithinRange(t *testing.T, actual, expected time.Time, tolerance time.Duration)

AssertTimeWithinRange verifies that a timestamp is within an acceptable range

func DLQStreamKey

func DLQStreamKey(queueName string) string

DLQStreamKey returns the key for dead letter queue stream.

func GenerateRandomID

func GenerateRandomID(length int) string

GenerateRandomID generates a random alphanumeric ID of specified length

func GenerateUniqueMessageID

func GenerateUniqueMessageID(t *testing.T) string

GenerateUniqueMessageID generates a unique message ID for testing

func GenerateUniqueQueueName

func GenerateUniqueQueueName(t *testing.T, prefix string) string

GenerateUniqueQueueName generates a unique queue name for testing

func GroupKey

func GroupKey(queueName string) string

GroupKey returns the key for consumer group identifier.

func LoadFixtureItem

func LoadFixtureItem(t *testing.T, filename, itemName string) json.RawMessage

LoadFixtureItem loads a specific item from a fixture file

func LoadJSONSchema

func LoadJSONSchema(t *testing.T, schemaFile string) string

LoadJSONSchema loads a JSON schema file from fixtures/schemas/

func MessageMetaKey

func MessageMetaKey(queueName, messageID string) string

MessageMetaKey returns the key for message metadata hash.

func QueueKey

func QueueKey(queueName string) string

QueueKey returns the key for a queue sorted set.

func QueueMetaKey

func QueueMetaKey(queueName string) string

QueueMetaKey returns the key for queue metadata hash.

func RunWithSharedEnvironment

func RunWithSharedEnvironment(m *testing.M) int

RunWithSharedEnvironment wraps m.Run() with shared environment setup and teardown. Use this in TestMain to manage the lifecycle of shared containers.

Example:

func TestMain(m *testing.M) {
    os.Exit(helpers.RunWithSharedEnvironment(m))
}

func ScheduleKey

func ScheduleKey(queueName string) string

ScheduleKey returns the key for scheduled messages sorted set.

func ScheduleMetaKey

func ScheduleMetaKey(scheduleID string) string

ScheduleMetaKey returns the key for schedule metadata hash.

func ScheduleSetKey

func ScheduleSetKey(scheduleID string) string

ScheduleSetKey returns the key for schedule tracking sorted set.

func StatsKey

func StatsKey(queueName string) string

StatsKey returns the key for queue statistics hash.

func StreamKey

func StreamKey(queueName, priority string) string

StreamKey returns the key for a priority-based message stream.

func UnmarshalFixtureItem

func UnmarshalFixtureItem(t *testing.T, filename, itemName string, v interface{})

UnmarshalFixtureItem loads and unmarshals a specific fixture item into the provided struct

func WaitForCondition

func WaitForCondition(t *testing.T, timeout time.Duration, condition func() bool) bool

WaitForCondition polls a condition function until it returns true or timeout occurs. This is useful for waiting on asynchronous operations like scheduler processing.

func WaitForMessageTransition

func WaitForMessageTransition(t *testing.T)

WaitForMessageTransition waits for the scheduler to process message state transitions. This is needed after posting messages because they go to the schedule index and must be promoted to streams by the scheduler before they can be retrieved with GetNextMessage. The test scheduler runs every 300ms (configured in shared_env.go), so we wait 400ms to ensure at least one scheduler cycle completes.

Types

type FixtureData

type FixtureData map[string]json.RawMessage

FixtureData represents the structure of fixture files

func LoadFixture

func LoadFixture(t *testing.T, filename string) FixtureData

LoadFixture loads a JSON fixture file and returns the parsed data

type MessageFixture

type MessageFixture struct {
	Content     interface{} `json:"content"`
	ContentType string      `json:"content_type"`
	Priority    int32       `json:"priority"`
	Note        string      `json:"note,omitempty"`
}

MessageFixture represents a message from fixtures/messages.json

func LoadMessageFixture

func LoadMessageFixture(t *testing.T, messageName string) *MessageFixture

LoadMessageFixture loads a specific message fixture

func (*MessageFixture) GetContentAsBytes

func (m *MessageFixture) GetContentAsBytes(t *testing.T) []byte

GetContentAsBytes converts the message content to bytes

func (*MessageFixture) GetContentAsJSON

func (m *MessageFixture) GetContentAsJSON(t *testing.T) string

GetMessageContent converts the message content to JSON string

type QueueFixture

type QueueFixture struct {
	Name     string                 `json:"name"`
	Metadata map[string]interface{} `json:"metadata"`
}

QueueFixture represents a queue configuration from fixtures/queues.json

func LoadQueueFixture

func LoadQueueFixture(t *testing.T, queueName string) *QueueFixture

LoadQueueFixture loads a specific queue fixture

type ScheduleFixture

type ScheduleFixture struct {
	Type           string                 `json:"type"`
	CronExpression string                 `json:"cron_expression,omitempty"`
	QueueName      string                 `json:"queue_name"`
	Message        MessageFixture         `json:"message"`
	CalendarRules  map[string]interface{} `json:"calendar_rules,omitempty"`
	Description    string                 `json:"description"`
}

ScheduleFixture represents a schedule from fixtures/schedules.json

func LoadScheduleFixture

func LoadScheduleFixture(t *testing.T, scheduleName string) *ScheduleFixture

LoadScheduleFixture loads a specific schedule fixture

type TestEnvironment

type TestEnvironment struct {
	RedisContainer  *redismodule.RedisContainer
	ServerContainer testcontainers.Container
	Network         *testcontainers.DockerNetwork
	RedisClient     *redis.Client
	RedisAddr       string
	GRPCAddr        string
	HTTPAddr        string
	// contains filtered or unexported fields
}

TestEnvironment holds all test infrastructure components. It manages the lifecycle of Redis and ChronoQueue containers, providing convenient access to clients and addresses.

func SetupTestEnvironment

func SetupTestEnvironment(t *testing.T) *TestEnvironment

SetupTestEnvironment creates and starts Redis and ChronoQueue containers. It automatically registers cleanup with t.Cleanup() to ensure proper teardown.

This function: 1. Starts a Redis container (redis:7-alpine) 2. Starts a ChronoQueue server container 3. Creates a Redis client 4. Returns a TestEnvironment with all necessary addresses

Example:

func TestMyFeature(t *testing.T) {
    env := SetupTestEnvironment(t)
    // env.Cleanup() is called automatically via t.Cleanup()

    client := env.NewGRPCClient(t)
    // ... perform tests
}

func SharedTestEnvironment

func SharedTestEnvironment(t *testing.T) *TestEnvironment

SharedTestEnvironment returns a shared test environment for all tests in a package. Containers are created once and reused across all tests. Each test should clean up its own data (e.g., by calling env.FlushRedis(t)).

This approach significantly speeds up test execution by avoiding container creation overhead for each test.

Example usage in test file:

func TestMain(m *testing.M) {
    os.Exit(helpers.RunWithSharedEnvironment(m))
}

func TestSomething(t *testing.T) {
    env := helpers.SharedTestEnvironment(t)
    defer env.FlushRedis(t) // Clean up after test

    // ... perform tests
}

func (*TestEnvironment) Cleanup

func (e *TestEnvironment) Cleanup()

Cleanup terminates all containers and closes connections. This is automatically called via t.Cleanup() when using SetupTestEnvironment.

func (*TestEnvironment) FlushRedis

func (e *TestEnvironment) FlushRedis(t *testing.T)

FlushRedis clears all data from the Redis database. Useful for ensuring clean state between test runs.

func (*TestEnvironment) NewGRPCClient

func (e *TestEnvironment) NewGRPCClient(t *testing.T) *grpc.ClientConn

NewGRPCClient creates a new gRPC client connection to the ChronoQueue server. The connection is automatically closed via t.Cleanup().

Example:

func TestCreateQueue(t *testing.T) {
    env := SetupTestEnvironment(t)
    conn := env.NewGRPCClient(t)

    client := queueservice_pb.NewQueueServiceClient(conn)
    // ... use client
}

func (*TestEnvironment) NewGRPCClientShared

func (e *TestEnvironment) NewGRPCClientShared(t *testing.T) *grpc.ClientConn

NewGRPCClientShared creates a new gRPC client for the shared environment. Unlike NewGRPCClient, this does NOT auto-close the connection via t.Cleanup. The caller is responsible for closing the connection.

Example:

func TestWithSharedEnv(t *testing.T) {
    env := helpers.SharedTestEnvironment(t)
    defer env.FlushRedis(t)

    conn := env.NewGRPCClientShared(t)
    defer conn.Close()

    client := queueservice_pb.NewQueueServiceClient(conn)
    // ... use client
}

func (*TestEnvironment) WaitForHealthy

func (e *TestEnvironment) WaitForHealthy(t *testing.T, timeout time.Duration)

WaitForHealthy waits for the ChronoQueue server to be healthy. This is useful if you need to ensure the server is fully ready before starting tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL