helpers

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AssertDLQMessageCount

func AssertDLQMessageCount(t *testing.T, dlqMessages []*message_pb.Message, expectedCount int)

AssertDLQMessageCount verifies the number of messages in DLQ

func AssertErrorContains

func AssertErrorContains(t *testing.T, err error, expectedSubstring string)

AssertErrorContains verifies that an error contains a specific substring

func AssertLeaseActive

func AssertLeaseActive(t *testing.T, msg *message_pb.Message)

AssertLeaseActive verifies that a message lease is active

func AssertLeaseExpired

func AssertLeaseExpired(t *testing.T, msg *message_pb.Message)

AssertLeaseExpired verifies that a message lease has expired

func AssertMessageContent

func AssertMessageContent(t *testing.T, msg *message_pb.Message, expectedContent string)

AssertMessageContent verifies message payload content

func AssertMessageContentType

func AssertMessageContentType(t *testing.T, msg *message_pb.Message, expectedContentType string)

AssertMessageContentType verifies content type

func AssertMessagePriority

func AssertMessagePriority(t *testing.T, messages []*message_pb.Message, expectedOrder []int64)

AssertMessagePriority verifies that messages are ordered by priority

func AssertMessageState

func AssertMessageState(t *testing.T, msg *message_pb.Message, expectedState message_pb.Message_Metadata_State)

AssertMessageState verifies that a message is in the expected state

func AssertNoError

func AssertNoError(t *testing.T, err error, operation string)

AssertNoError is a convenience wrapper around require.NoError with custom message

func AssertQueueExists

func AssertQueueExists(t *testing.T, queues []*queue_pb.Queue, queueName string)

AssertQueueExists verifies that a queue with the given name exists

func AssertQueueNotExists

func AssertQueueNotExists(t *testing.T, queues []*queue_pb.Queue, queueName string)

AssertQueueNotExists verifies that a queue with the given name does not exist

func AssertQueueStateApproximate

func AssertQueueStateApproximate(t *testing.T, pending, running, completed int64,
	expectedPending, expectedRunning, expectedCompleted int64, tolerance int64,
)

AssertQueueStateApproximate verifies queue statistics (allowing for timing variations)

func AssertRetryAttemptsLeft

func AssertRetryAttemptsLeft(t *testing.T, msg *message_pb.Message, expectedAttempts int32)

AssertRetryAttemptsLeft verifies message retry attempts left

func AssertScheduleExecutionCount

func AssertScheduleExecutionCount(t *testing.T, executionCount int, expectedCount int)

AssertScheduleExecutionCount verifies the number of schedule executions

func AssertSuccess

func AssertSuccess(t *testing.T, success bool, operation string)

AssertSuccess verifies that a response indicates success

func AssertTimeWithinRange

func AssertTimeWithinRange(t *testing.T, actual, expected time.Time, tolerance time.Duration)

AssertTimeWithinRange verifies that a timestamp is within an acceptable range

func DLQStreamKey

func DLQStreamKey(queueName string) string

DLQStreamKey returns the key for dead letter queue stream.

func GenerateRandomID

func GenerateRandomID(length int) string

GenerateRandomID generates a random alphanumeric ID of specified length

func GenerateUniqueMessageID

func GenerateUniqueMessageID(t *testing.T) string

GenerateUniqueMessageID generates a unique message ID for testing

func GenerateUniqueQueueName

func GenerateUniqueQueueName(t *testing.T, prefix string) string

GenerateUniqueQueueName generates a unique queue name for testing

func GroupKey

func GroupKey(queueName string) string

GroupKey returns the key for consumer group identifier.

func LoadFixtureItem

func LoadFixtureItem(t *testing.T, filename, itemName string) json.RawMessage

LoadFixtureItem loads a specific item from a fixture file

func LoadJSONSchema

func LoadJSONSchema(t *testing.T, schemaFile string) string

LoadJSONSchema loads a JSON schema file from fixtures/schemas/

func MessageMetaKey

func MessageMetaKey(queueName, messageID string) string

MessageMetaKey returns the key for message metadata hash.

func QueueKey

func QueueKey(queueName string) string

QueueKey returns the key for a queue sorted set.

func QueueMetaKey

func QueueMetaKey(queueName string) string

QueueMetaKey returns the key for queue metadata hash.

func RunWithSharedEnvironment

func RunWithSharedEnvironment(m *testing.M) int

RunWithSharedEnvironment wraps m.Run() with shared environment setup and teardown. Use this in TestMain to manage the lifecycle of shared containers.

Example:

func TestMain(m *testing.M) {
    os.Exit(helpers.RunWithSharedEnvironment(m))
}

func ScheduleKey

func ScheduleKey(queueName string) string

ScheduleKey returns the key for scheduled messages sorted set.

func ScheduleMetaKey

func ScheduleMetaKey(scheduleID string) string

ScheduleMetaKey returns the key for schedule metadata hash.

func ScheduleSetKey

func ScheduleSetKey(scheduleID string) string

ScheduleSetKey returns the key for schedule tracking sorted set.

func StatsKey

func StatsKey(queueName string) string

StatsKey returns the key for queue statistics hash.

func StreamKey

func StreamKey(queueName, priority string) string

StreamKey returns the key for a priority-based message stream.

func UnmarshalFixtureItem

func UnmarshalFixtureItem(t *testing.T, filename, itemName string, v interface{})

UnmarshalFixtureItem loads and unmarshals a specific fixture item into the provided struct

func WaitForCondition

func WaitForCondition(t *testing.T, timeout time.Duration, condition func() bool) bool

WaitForCondition polls a condition function until it returns true or timeout occurs. This is useful for waiting on asynchronous operations like scheduler processing.

func WaitForMessageTransition

func WaitForMessageTransition(t *testing.T)

WaitForMessageTransition waits for the scheduler to process message state transitions. This is needed after posting messages because they go to the schedule index and must be promoted to streams by the scheduler before they can be retrieved with GetNextMessage. The test scheduler runs every 300ms (configured in shared_env.go), so we wait 400ms to ensure at least one scheduler cycle completes.

Types

type FixtureData

type FixtureData map[string]json.RawMessage

FixtureData represents the structure of fixture files

func LoadFixture

func LoadFixture(t *testing.T, filename string) FixtureData

LoadFixture loads a JSON fixture file and returns the parsed data

type MessageFixture

type MessageFixture struct {
	Content     interface{} `json:"content"`
	ContentType string      `json:"content_type"`
	Priority    int32       `json:"priority"`
	Note        string      `json:"note,omitempty"`
}

MessageFixture represents a message from fixtures/messages.json

func LoadMessageFixture

func LoadMessageFixture(t *testing.T, messageName string) *MessageFixture

LoadMessageFixture loads a specific message fixture

func (*MessageFixture) GetContentAsBytes

func (m *MessageFixture) GetContentAsBytes(t *testing.T) []byte

GetContentAsBytes converts the message content to bytes

func (*MessageFixture) GetContentAsJSON

func (m *MessageFixture) GetContentAsJSON(t *testing.T) string

GetMessageContent converts the message content to JSON string

type QueueFixture

type QueueFixture struct {
	Name     string                 `json:"name"`
	Metadata map[string]interface{} `json:"metadata"`
}

QueueFixture represents a queue configuration from fixtures/queues.json

func LoadQueueFixture

func LoadQueueFixture(t *testing.T, queueName string) *QueueFixture

LoadQueueFixture loads a specific queue fixture

type ScheduleFixture

type ScheduleFixture struct {
	Type           string                 `json:"type"`
	CronExpression string                 `json:"cron_expression,omitempty"`
	QueueName      string                 `json:"queue_name"`
	Message        MessageFixture         `json:"message"`
	CalendarRules  map[string]interface{} `json:"calendar_rules,omitempty"`
	Description    string                 `json:"description"`
}

ScheduleFixture represents a schedule from fixtures/schedules.json

func LoadScheduleFixture

func LoadScheduleFixture(t *testing.T, scheduleName string) *ScheduleFixture

LoadScheduleFixture loads a specific schedule fixture

type TestCertificates added in v1.2.0

type TestCertificates struct {
	CACert     string // Path to CA certificate
	ServerCert string // Path to server certificate
	ServerKey  string // Path to server key
	ClientCert string // Path to client certificate
	ClientKey  string // Path to client key
	CAPool     *x509.CertPool
	TempDir    string // Temporary directory for certificates
}

TestCertificates holds paths to test certificates for TLS/mTLS testing.

func GenerateTestCertificates added in v1.2.0

func GenerateTestCertificates(t *testing.T) *TestCertificates

GenerateTestCertificates creates a complete set of test certificates for TLS/mTLS testing. It generates: - A root CA certificate - A server certificate signed by the CA - A client certificate signed by the CA

All certificates are written to a temporary directory that is automatically cleaned up when the test completes.

Example:

func TestWithTLS(t *testing.T) {
    certs := helpers.GenerateTestCertificates(t)
    // Use certs.ServerCert, certs.ServerKey, etc.
}

func (*TestCertificates) LoadClientTLSConfig added in v1.2.0

func (c *TestCertificates) LoadClientTLSConfig(t *testing.T) *tls.Config

LoadClientTLSConfig creates a TLS configuration for a client using the test certificates. This is useful for creating gRPC clients with mTLS.

type TestEnvironment

type TestEnvironment struct {
	PostgresContainer *postgres.PostgresContainer
	ServerContainer   testcontainers.Container
	Network           *testcontainers.DockerNetwork
	PostgresConnStr   string
	GRPCAddr          string
	HTTPAddr          string
	// contains filtered or unexported fields
}

TestEnvironment holds all test infrastructure components. It manages the lifecycle of Postgres and ChronoQueue containers, providing convenient access to clients and addresses.

func SetupTestEnvironment

func SetupTestEnvironment(t *testing.T) *TestEnvironment

SetupTestEnvironment creates and starts Postgres and ChronoQueue containers. It automatically registers cleanup with t.Cleanup() to ensure proper teardown.

This function: 1. Starts a Postgres container (postgres:17-alpine) 2. Starts a ChronoQueue server container 3. Returns a TestEnvironment with all necessary addresses

Example:

func TestMyFeature(t *testing.T) {
    env := SetupTestEnvironment(t)
    // env.Cleanup() is called automatically via t.Cleanup()

    client := env.NewGRPCClient(t)
    // ... perform tests
}

func SetupTestEnvironmentWithTLS added in v1.2.0

func SetupTestEnvironmentWithTLS(t *testing.T, certs *TestCertificates) *TestEnvironment

SetupTestEnvironmentWithTLS creates and starts a ChronoQueue server with TLS/mTLS enabled. This is similar to SetupTestEnvironment but configures the server with TLS certificates.

The certs parameter should be obtained from GenerateTestCertificates().

Example:

func TestWithTLS(t *testing.T) {
    certs := helpers.GenerateTestCertificates(t)
    env := helpers.SetupTestEnvironmentWithTLS(t, certs)

    // Create TLS client
    tlsConfig := certs.LoadClientTLSConfig(t)
    // ... use tlsConfig with gRPC client
}

func SharedTestEnvironment

func SharedTestEnvironment(t *testing.T) *TestEnvironment

SharedTestEnvironment returns a shared test environment for all tests in a package. Containers are created once and reused across all tests. Each test should clean up its own data (e.g., by deleting test queues/messages).

This approach significantly speeds up test execution by avoiding container creation overhead for each test.

Example usage in test file:

func TestMain(m *testing.M) {
    os.Exit(helpers.RunWithSharedEnvironment(m))
}

func TestSomething(t *testing.T) {
    env := helpers.SharedTestEnvironment(t)

    // ... perform tests
}

func (*TestEnvironment) Cleanup

func (e *TestEnvironment) Cleanup()

Cleanup terminates all containers and closes connections. This is automatically called via t.Cleanup() when using SetupTestEnvironment.

func (*TestEnvironment) NewGRPCClient

func (e *TestEnvironment) NewGRPCClient(t *testing.T) *grpc.ClientConn

NewGRPCClient creates a new gRPC client connection to the ChronoQueue server. The connection is automatically closed via t.Cleanup().

Example:

func TestCreateQueue(t *testing.T) {
    env := SetupTestEnvironment(t)
    conn := env.NewGRPCClient(t)

    client := queueservice_pb.NewQueueServiceClient(conn)
    // ... use client
}

func (*TestEnvironment) NewGRPCClientShared

func (e *TestEnvironment) NewGRPCClientShared(t *testing.T) *grpc.ClientConn

NewGRPCClientShared creates a new gRPC client for the shared environment. Unlike NewGRPCClient, this does NOT auto-close the connection via t.Cleanup. The caller is responsible for closing the connection.

Example:

func TestWithSharedEnv(t *testing.T) {
    env := helpers.SharedTestEnvironment(t)

    conn := env.NewGRPCClientShared(t)
    defer conn.Close()

    client := queueservice_pb.NewQueueServiceClient(conn)
    // ... use client
}

func (*TestEnvironment) WaitForHealthy

func (e *TestEnvironment) WaitForHealthy(t *testing.T, timeout time.Duration)

WaitForHealthy waits for the ChronoQueue server to be healthy. This is useful if you need to ensure the server is fully ready before starting tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL