Documentation
¶
Index ¶
- func AssertDLQMessageCount(t *testing.T, dlqMessages []*message_pb.Message, expectedCount int)
- func AssertErrorContains(t *testing.T, err error, expectedSubstring string)
- func AssertLeaseActive(t *testing.T, msg *message_pb.Message)
- func AssertLeaseExpired(t *testing.T, msg *message_pb.Message)
- func AssertMessageContent(t *testing.T, msg *message_pb.Message, expectedContent string)
- func AssertMessageContentType(t *testing.T, msg *message_pb.Message, expectedContentType string)
- func AssertMessagePriority(t *testing.T, messages []*message_pb.Message, expectedOrder []int64)
- func AssertMessageState(t *testing.T, msg *message_pb.Message, ...)
- func AssertNoError(t *testing.T, err error, operation string)
- func AssertQueueExists(t *testing.T, queues []*queue_pb.Queue, queueName string)
- func AssertQueueNotExists(t *testing.T, queues []*queue_pb.Queue, queueName string)
- func AssertQueueStateApproximate(t *testing.T, pending, running, completed int64, ...)
- func AssertRetryAttemptsLeft(t *testing.T, msg *message_pb.Message, expectedAttempts int32)
- func AssertScheduleExecutionCount(t *testing.T, executionCount int, expectedCount int)
- func AssertSuccess(t *testing.T, success bool, operation string)
- func AssertTimeWithinRange(t *testing.T, actual, expected time.Time, tolerance time.Duration)
- func DLQStreamKey(queueName string) string
- func GenerateRandomID(length int) string
- func GenerateUniqueMessageID(t *testing.T) string
- func GenerateUniqueQueueName(t *testing.T, prefix string) string
- func GroupKey(queueName string) string
- func LoadFixtureItem(t *testing.T, filename, itemName string) json.RawMessage
- func LoadJSONSchema(t *testing.T, schemaFile string) string
- func MessageMetaKey(queueName, messageID string) string
- func QueueKey(queueName string) string
- func QueueMetaKey(queueName string) string
- func RunWithSharedEnvironment(m *testing.M) int
- func ScheduleKey(queueName string) string
- func ScheduleMetaKey(scheduleID string) string
- func ScheduleSetKey(scheduleID string) string
- func StatsKey(queueName string) string
- func StreamKey(queueName, priority string) string
- func UnmarshalFixtureItem(t *testing.T, filename, itemName string, v interface{})
- func WaitForCondition(t *testing.T, timeout time.Duration, condition func() bool) bool
- func WaitForMessageTransition(t *testing.T)
- type FixtureData
- type MessageFixture
- type QueueFixture
- type ScheduleFixture
- type TestEnvironment
- func (e *TestEnvironment) Cleanup()
- func (e *TestEnvironment) FlushRedis(t *testing.T)
- func (e *TestEnvironment) NewGRPCClient(t *testing.T) *grpc.ClientConn
- func (e *TestEnvironment) NewGRPCClientShared(t *testing.T) *grpc.ClientConn
- func (e *TestEnvironment) WaitForHealthy(t *testing.T, timeout time.Duration)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AssertDLQMessageCount ¶
func AssertDLQMessageCount(t *testing.T, dlqMessages []*message_pb.Message, expectedCount int)
AssertDLQMessageCount verifies the number of messages in DLQ
func AssertErrorContains ¶
AssertErrorContains verifies that an error contains a specific substring
func AssertLeaseActive ¶
func AssertLeaseActive(t *testing.T, msg *message_pb.Message)
AssertLeaseActive verifies that a message lease is active
func AssertLeaseExpired ¶
func AssertLeaseExpired(t *testing.T, msg *message_pb.Message)
AssertLeaseExpired verifies that a message lease has expired
func AssertMessageContent ¶
func AssertMessageContent(t *testing.T, msg *message_pb.Message, expectedContent string)
AssertMessageContent verifies message payload content
func AssertMessageContentType ¶
func AssertMessageContentType(t *testing.T, msg *message_pb.Message, expectedContentType string)
AssertMessageContentType verifies content type
func AssertMessagePriority ¶
func AssertMessagePriority(t *testing.T, messages []*message_pb.Message, expectedOrder []int64)
AssertMessagePriority verifies that messages are ordered by priority
func AssertMessageState ¶
func AssertMessageState(t *testing.T, msg *message_pb.Message, expectedState message_pb.Message_Metadata_State)
AssertMessageState verifies that a message is in the expected state
func AssertNoError ¶
AssertNoError is a convenience wrapper around require.NoError with custom message
func AssertQueueExists ¶
AssertQueueExists verifies that a queue with the given name exists
func AssertQueueNotExists ¶
AssertQueueNotExists verifies that a queue with the given name does not exist
func AssertQueueStateApproximate ¶
func AssertQueueStateApproximate(t *testing.T, pending, running, completed int64, expectedPending, expectedRunning, expectedCompleted int64, tolerance int64, )
AssertQueueStateApproximate verifies queue statistics (allowing for timing variations)
func AssertRetryAttemptsLeft ¶
func AssertRetryAttemptsLeft(t *testing.T, msg *message_pb.Message, expectedAttempts int32)
AssertRetryAttemptsLeft verifies message retry attempts left
func AssertScheduleExecutionCount ¶
AssertScheduleExecutionCount verifies the number of schedule executions
func AssertSuccess ¶
AssertSuccess verifies that a response indicates success
func AssertTimeWithinRange ¶
AssertTimeWithinRange verifies that a timestamp is within an acceptable range
func DLQStreamKey ¶
DLQStreamKey returns the key for dead letter queue stream.
func GenerateRandomID ¶
GenerateRandomID generates a random alphanumeric ID of specified length
func GenerateUniqueMessageID ¶
GenerateUniqueMessageID generates a unique message ID for testing
func GenerateUniqueQueueName ¶
GenerateUniqueQueueName generates a unique queue name for testing
func LoadFixtureItem ¶
func LoadFixtureItem(t *testing.T, filename, itemName string) json.RawMessage
LoadFixtureItem loads a specific item from a fixture file
func LoadJSONSchema ¶
LoadJSONSchema loads a JSON schema file from fixtures/schemas/
func MessageMetaKey ¶
MessageMetaKey returns the key for message metadata hash.
func QueueMetaKey ¶
QueueMetaKey returns the key for queue metadata hash.
func RunWithSharedEnvironment ¶
RunWithSharedEnvironment wraps m.Run() with shared environment setup and teardown. Use this in TestMain to manage the lifecycle of shared containers.
Example:
func TestMain(m *testing.M) {
os.Exit(helpers.RunWithSharedEnvironment(m))
}
func ScheduleKey ¶
ScheduleKey returns the key for scheduled messages sorted set.
func ScheduleMetaKey ¶
ScheduleMetaKey returns the key for schedule metadata hash.
func ScheduleSetKey ¶
ScheduleSetKey returns the key for schedule tracking sorted set.
func UnmarshalFixtureItem ¶
UnmarshalFixtureItem loads and unmarshals a specific fixture item into the provided struct
func WaitForCondition ¶
WaitForCondition polls a condition function until it returns true or timeout occurs. This is useful for waiting on asynchronous operations like scheduler processing.
func WaitForMessageTransition ¶
WaitForMessageTransition waits for the scheduler to process message state transitions. This is needed after posting messages because they go to the schedule index and must be promoted to streams by the scheduler before they can be retrieved with GetNextMessage. The test scheduler runs every 300ms (configured in shared_env.go), so we wait 400ms to ensure at least one scheduler cycle completes.
Types ¶
type FixtureData ¶
type FixtureData map[string]json.RawMessage
FixtureData represents the structure of fixture files
func LoadFixture ¶
func LoadFixture(t *testing.T, filename string) FixtureData
LoadFixture loads a JSON fixture file and returns the parsed data
type MessageFixture ¶
type MessageFixture struct {
Content interface{} `json:"content"`
ContentType string `json:"content_type"`
Priority int32 `json:"priority"`
Note string `json:"note,omitempty"`
}
MessageFixture represents a message from fixtures/messages.json
func LoadMessageFixture ¶
func LoadMessageFixture(t *testing.T, messageName string) *MessageFixture
LoadMessageFixture loads a specific message fixture
func (*MessageFixture) GetContentAsBytes ¶
func (m *MessageFixture) GetContentAsBytes(t *testing.T) []byte
GetContentAsBytes converts the message content to bytes
func (*MessageFixture) GetContentAsJSON ¶
func (m *MessageFixture) GetContentAsJSON(t *testing.T) string
GetMessageContent converts the message content to JSON string
type QueueFixture ¶
type QueueFixture struct {
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
}
QueueFixture represents a queue configuration from fixtures/queues.json
func LoadQueueFixture ¶
func LoadQueueFixture(t *testing.T, queueName string) *QueueFixture
LoadQueueFixture loads a specific queue fixture
type ScheduleFixture ¶
type ScheduleFixture struct {
Type string `json:"type"`
CronExpression string `json:"cron_expression,omitempty"`
QueueName string `json:"queue_name"`
Message MessageFixture `json:"message"`
CalendarRules map[string]interface{} `json:"calendar_rules,omitempty"`
Description string `json:"description"`
}
ScheduleFixture represents a schedule from fixtures/schedules.json
func LoadScheduleFixture ¶
func LoadScheduleFixture(t *testing.T, scheduleName string) *ScheduleFixture
LoadScheduleFixture loads a specific schedule fixture
type TestEnvironment ¶
type TestEnvironment struct {
RedisContainer *redismodule.RedisContainer
ServerContainer testcontainers.Container
Network *testcontainers.DockerNetwork
RedisClient *redis.Client
RedisAddr string
GRPCAddr string
HTTPAddr string
// contains filtered or unexported fields
}
TestEnvironment holds all test infrastructure components. It manages the lifecycle of Redis and ChronoQueue containers, providing convenient access to clients and addresses.
func SetupTestEnvironment ¶
func SetupTestEnvironment(t *testing.T) *TestEnvironment
SetupTestEnvironment creates and starts Redis and ChronoQueue containers. It automatically registers cleanup with t.Cleanup() to ensure proper teardown.
This function: 1. Starts a Redis container (redis:7-alpine) 2. Starts a ChronoQueue server container 3. Creates a Redis client 4. Returns a TestEnvironment with all necessary addresses
Example:
func TestMyFeature(t *testing.T) {
env := SetupTestEnvironment(t)
// env.Cleanup() is called automatically via t.Cleanup()
client := env.NewGRPCClient(t)
// ... perform tests
}
func SharedTestEnvironment ¶
func SharedTestEnvironment(t *testing.T) *TestEnvironment
SharedTestEnvironment returns a shared test environment for all tests in a package. Containers are created once and reused across all tests. Each test should clean up its own data (e.g., by calling env.FlushRedis(t)).
This approach significantly speeds up test execution by avoiding container creation overhead for each test.
Example usage in test file:
func TestMain(m *testing.M) {
os.Exit(helpers.RunWithSharedEnvironment(m))
}
func TestSomething(t *testing.T) {
env := helpers.SharedTestEnvironment(t)
defer env.FlushRedis(t) // Clean up after test
// ... perform tests
}
func (*TestEnvironment) Cleanup ¶
func (e *TestEnvironment) Cleanup()
Cleanup terminates all containers and closes connections. This is automatically called via t.Cleanup() when using SetupTestEnvironment.
func (*TestEnvironment) FlushRedis ¶
func (e *TestEnvironment) FlushRedis(t *testing.T)
FlushRedis clears all data from the Redis database. Useful for ensuring clean state between test runs.
func (*TestEnvironment) NewGRPCClient ¶
func (e *TestEnvironment) NewGRPCClient(t *testing.T) *grpc.ClientConn
NewGRPCClient creates a new gRPC client connection to the ChronoQueue server. The connection is automatically closed via t.Cleanup().
Example:
func TestCreateQueue(t *testing.T) {
env := SetupTestEnvironment(t)
conn := env.NewGRPCClient(t)
client := queueservice_pb.NewQueueServiceClient(conn)
// ... use client
}
func (*TestEnvironment) NewGRPCClientShared ¶
func (e *TestEnvironment) NewGRPCClientShared(t *testing.T) *grpc.ClientConn
NewGRPCClientShared creates a new gRPC client for the shared environment. Unlike NewGRPCClient, this does NOT auto-close the connection via t.Cleanup. The caller is responsible for closing the connection.
Example:
func TestWithSharedEnv(t *testing.T) {
env := helpers.SharedTestEnvironment(t)
defer env.FlushRedis(t)
conn := env.NewGRPCClientShared(t)
defer conn.Close()
client := queueservice_pb.NewQueueServiceClient(conn)
// ... use client
}
func (*TestEnvironment) WaitForHealthy ¶
func (e *TestEnvironment) WaitForHealthy(t *testing.T, timeout time.Duration)
WaitForHealthy waits for the ChronoQueue server to be healthy. This is useful if you need to ensure the server is fully ready before starting tests.