Documentation
¶
Overview ¶
Package testutil provides testing utilities for the reactive workflow engine.
Index ¶
- Variables
- type InMemoryBus
- func (b *InMemoryBus) Clear()
- func (b *InMemoryBus) Count() int
- func (b *InMemoryBus) CountForSubject(pattern string) int
- func (b *InMemoryBus) GetPayload(pattern string, v any) error
- func (b *InMemoryBus) HasMessage(pattern string) bool
- func (b *InMemoryBus) HasMessageWithType(pattern string, msgType message.Type) bool
- func (b *InMemoryBus) LastMessage() *PublishedMessage
- func (b *InMemoryBus) LastMessageForSubject(pattern string) *PublishedMessage
- func (b *InMemoryBus) Messages() []PublishedMessage
- func (b *InMemoryBus) MessagesForSubject(pattern string) []PublishedMessage
- func (b *InMemoryBus) Publish(_ context.Context, subject string, data []byte) error
- func (b *InMemoryBus) Subscribe(pattern string, handler func(context.Context, []byte))
- func (b *InMemoryBus) WaitForCount(pattern string, n int, timeout time.Duration) error
- func (b *InMemoryBus) WaitForMessage(pattern string, timeout time.Duration) (*PublishedMessage, error)
- type InMemoryKV
- func (kv *InMemoryKV) Bucket() string
- func (kv *InMemoryKV) Clear()
- func (kv *InMemoryKV) Create(_ context.Context, key string, value []byte) (uint64, error)
- func (kv *InMemoryKV) Delete(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error
- func (kv *InMemoryKV) Get(_ context.Context, key string) (jetstream.KeyValueEntry, error)
- func (kv *InMemoryKV) GetData() map[string][]byte
- func (kv *InMemoryKV) GetEntry(key string) ([]byte, uint64, bool)
- func (kv *InMemoryKV) GetJSON(ctx context.Context, key string, v any) error
- func (kv *InMemoryKV) History(_ context.Context, key string, _ ...jetstream.WatchOpt) ([]jetstream.KeyValueEntry, error)
- func (kv *InMemoryKV) Keys(_ context.Context, _ ...jetstream.WatchOpt) ([]string, error)
- func (kv *InMemoryKV) ListKeys(_ context.Context, _ ...jetstream.WatchOpt) (jetstream.KeyLister, error)
- func (kv *InMemoryKV) Purge(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error
- func (kv *InMemoryKV) Put(_ context.Context, key string, value []byte) (uint64, error)
- func (kv *InMemoryKV) PutJSON(ctx context.Context, key string, v any) (uint64, error)
- func (kv *InMemoryKV) Status(_ context.Context) (jetstream.KeyValueStatus, error)
- func (kv *InMemoryKV) Update(_ context.Context, key string, value []byte, revision uint64) (uint64, error)
- func (kv *InMemoryKV) Watch(_ context.Context, keys string, _ ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
- func (kv *InMemoryKV) WatchAll(_ context.Context, _ ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
- type PublishedMessage
- type TestEngine
- func (e *TestEngine) AssertIteration(key string, expectedIteration int)
- func (e *TestEngine) AssertNoPublished(subjectPattern string)
- func (e *TestEngine) AssertPayload(subjectPattern string, assertFn func(t *testing.T, payload any))
- func (e *TestEngine) AssertPhase(key, expectedPhase string)
- func (e *TestEngine) AssertPublished(subjectPattern string)
- func (e *TestEngine) AssertPublishedCount(subjectPattern string, expectedCount int)
- func (e *TestEngine) AssertPublishedWithType(subjectPattern string, msgType message.Type)
- func (e *TestEngine) AssertState(key string, assertFn func(t *testing.T, data []byte))
- func (e *TestEngine) AssertStateAs(key string, state any, assertFn func(t *testing.T, state any))
- func (e *TestEngine) AssertStatus(key string, expectedStatus reactive.ExecutionStatus)
- func (e *TestEngine) Bus() *InMemoryBus
- func (e *TestEngine) Clear()
- func (e *TestEngine) GetState(key string) ([]byte, error)
- func (e *TestEngine) GetStateAs(key string, state any) error
- func (e *TestEngine) HandleCallback(ctx context.Context, key string, result message.Payload, taskID string) error
- func (e *TestEngine) KV() *InMemoryKV
- func (e *TestEngine) PrintMessages()
- func (e *TestEngine) PrintState(key string)
- func (e *TestEngine) RegisterWorkflow(def *reactive.Definition) error
- func (e *TestEngine) Registry() *reactive.WorkflowRegistry
- func (e *TestEngine) TriggerKV(ctx context.Context, key string, state any) error
- func (e *TestEngine) TriggerKVUpdate(ctx context.Context, key string, state any, revision uint64) error
- func (e *TestEngine) TriggerMessage(ctx context.Context, subject string, payload message.Payload, source string) error
- func (e *TestEngine) WaitForPhase(key, expectedPhase string, timeout time.Duration) error
- func (e *TestEngine) WaitForPublished(subjectPattern string, timeout time.Duration) error
- func (e *TestEngine) WaitForStatus(key string, expectedStatus reactive.ExecutionStatus, timeout time.Duration) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoMessages = errors.New("no messages found") ErrTimeout = errors.New("timeout waiting for message") )
Common errors for testutil package.
Functions ¶
This section is empty.
Types ¶
type InMemoryBus ¶
type InMemoryBus struct {
// contains filtered or unexported fields
}
InMemoryBus captures published messages for testing.
func NewInMemoryBus ¶
func NewInMemoryBus() *InMemoryBus
NewInMemoryBus creates a new in-memory message bus.
func (*InMemoryBus) Count ¶
func (b *InMemoryBus) Count() int
Count returns the total number of messages.
func (*InMemoryBus) CountForSubject ¶
func (b *InMemoryBus) CountForSubject(pattern string) int
CountForSubject returns the number of messages matching the pattern.
func (*InMemoryBus) GetPayload ¶
func (b *InMemoryBus) GetPayload(pattern string, v any) error
GetPayload unmarshals and returns the payload from the last matching message.
func (*InMemoryBus) HasMessage ¶
func (b *InMemoryBus) HasMessage(pattern string) bool
HasMessage checks if any message matches the subject pattern.
func (*InMemoryBus) HasMessageWithType ¶
func (b *InMemoryBus) HasMessageWithType(pattern string, msgType message.Type) bool
HasMessageWithType checks if any message matches the subject and type.
func (*InMemoryBus) LastMessage ¶
func (b *InMemoryBus) LastMessage() *PublishedMessage
LastMessage returns the most recent message, or nil if none.
func (*InMemoryBus) LastMessageForSubject ¶
func (b *InMemoryBus) LastMessageForSubject(pattern string) *PublishedMessage
LastMessageForSubject returns the most recent message matching the pattern.
func (*InMemoryBus) Messages ¶
func (b *InMemoryBus) Messages() []PublishedMessage
Messages returns all published messages.
func (*InMemoryBus) MessagesForSubject ¶
func (b *InMemoryBus) MessagesForSubject(pattern string) []PublishedMessage
MessagesForSubject returns messages matching the subject pattern.
func (*InMemoryBus) Subscribe ¶
func (b *InMemoryBus) Subscribe(pattern string, handler func(context.Context, []byte))
Subscribe registers a handler for messages matching the pattern.
func (*InMemoryBus) WaitForCount ¶
WaitForCount waits for at least n messages matching the pattern.
func (*InMemoryBus) WaitForMessage ¶
func (b *InMemoryBus) WaitForMessage(pattern string, timeout time.Duration) (*PublishedMessage, error)
WaitForMessage waits for a message matching the pattern within the timeout.
type InMemoryKV ¶
type InMemoryKV struct {
// contains filtered or unexported fields
}
InMemoryKV implements a mock KV store for testing. It supports basic CRUD operations and watch functionality.
func NewInMemoryKV ¶
func NewInMemoryKV(name string) *InMemoryKV
NewInMemoryKV creates a new in-memory KV store.
func (*InMemoryKV) Delete ¶
func (kv *InMemoryKV) Delete(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error
Delete removes a key.
func (*InMemoryKV) Get ¶
func (kv *InMemoryKV) Get(_ context.Context, key string) (jetstream.KeyValueEntry, error)
Get retrieves a value by key.
func (*InMemoryKV) GetData ¶
func (kv *InMemoryKV) GetData() map[string][]byte
GetData returns a copy of all data for inspection.
func (*InMemoryKV) GetEntry ¶
func (kv *InMemoryKV) GetEntry(key string) ([]byte, uint64, bool)
GetEntry returns a specific entry for inspection.
func (*InMemoryKV) History ¶
func (kv *InMemoryKV) History(_ context.Context, key string, _ ...jetstream.WatchOpt) ([]jetstream.KeyValueEntry, error)
History returns the history for a key (simplified - returns current only).
func (*InMemoryKV) ListKeys ¶
func (kv *InMemoryKV) ListKeys(_ context.Context, _ ...jetstream.WatchOpt) (jetstream.KeyLister, error)
ListKeys returns a channel of keys (for iteration).
func (*InMemoryKV) Purge ¶
func (kv *InMemoryKV) Purge(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error
Purge removes all values.
func (*InMemoryKV) Status ¶
func (kv *InMemoryKV) Status(_ context.Context) (jetstream.KeyValueStatus, error)
Status returns the bucket status.
func (*InMemoryKV) Update ¶
func (kv *InMemoryKV) Update(_ context.Context, key string, value []byte, revision uint64) (uint64, error)
Update stores a value only if the revision matches.
func (*InMemoryKV) Watch ¶
func (kv *InMemoryKV) Watch(_ context.Context, keys string, _ ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
Watch starts watching for changes to keys matching the pattern.
func (*InMemoryKV) WatchAll ¶
func (kv *InMemoryKV) WatchAll(_ context.Context, _ ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
WatchAll watches all keys.
type PublishedMessage ¶
PublishedMessage represents a message that was published to the bus.
type TestEngine ¶
type TestEngine struct {
// contains filtered or unexported fields
}
TestEngine wraps a reactive workflow engine with test helpers.
func NewTestEngine ¶
func NewTestEngine(t *testing.T) *TestEngine
NewTestEngine creates a new test engine with in-memory KV and bus.
func (*TestEngine) AssertIteration ¶
func (e *TestEngine) AssertIteration(key string, expectedIteration int)
AssertIteration verifies the iteration count matches the expected value.
func (*TestEngine) AssertNoPublished ¶
func (e *TestEngine) AssertNoPublished(subjectPattern string)
AssertNoPublished verifies that no messages were published to a subject pattern.
func (*TestEngine) AssertPayload ¶
func (e *TestEngine) AssertPayload(subjectPattern string, assertFn func(t *testing.T, payload any))
AssertPayload verifies the payload of the last message to a subject pattern.
func (*TestEngine) AssertPhase ¶
func (e *TestEngine) AssertPhase(key, expectedPhase string)
AssertPhase verifies the execution phase matches the expected value.
func (*TestEngine) AssertPublished ¶
func (e *TestEngine) AssertPublished(subjectPattern string)
AssertPublished verifies that a message was published to a subject pattern.
func (*TestEngine) AssertPublishedCount ¶
func (e *TestEngine) AssertPublishedCount(subjectPattern string, expectedCount int)
AssertPublishedCount verifies the number of messages published to a subject pattern.
func (*TestEngine) AssertPublishedWithType ¶
func (e *TestEngine) AssertPublishedWithType(subjectPattern string, msgType message.Type)
AssertPublishedWithType verifies that a message with specific type was published.
func (*TestEngine) AssertState ¶
func (e *TestEngine) AssertState(key string, assertFn func(t *testing.T, data []byte))
AssertState verifies the state using a custom assertion function.
func (*TestEngine) AssertStateAs ¶
AssertStateAs unmarshals state into the given type and runs assertions.
func (*TestEngine) AssertStatus ¶
func (e *TestEngine) AssertStatus(key string, expectedStatus reactive.ExecutionStatus)
AssertStatus verifies the execution status matches the expected value.
func (*TestEngine) Bus ¶
func (e *TestEngine) Bus() *InMemoryBus
Bus returns the in-memory message bus.
func (*TestEngine) GetState ¶
func (e *TestEngine) GetState(key string) ([]byte, error)
GetState retrieves the current state for a key.
func (*TestEngine) GetStateAs ¶
func (e *TestEngine) GetStateAs(key string, state any) error
GetStateAs retrieves and unmarshals the state into the given type.
func (*TestEngine) HandleCallback ¶
func (e *TestEngine) HandleCallback(ctx context.Context, key string, result message.Payload, taskID string) error
HandleCallback simulates receiving an async callback result. The result must implement the message.Payload interface (have Schema() method).
func (*TestEngine) PrintMessages ¶
func (e *TestEngine) PrintMessages()
PrintMessages prints all published messages for debugging.
func (*TestEngine) PrintState ¶
func (e *TestEngine) PrintState(key string)
PrintState prints the current state for debugging.
func (*TestEngine) RegisterWorkflow ¶
func (e *TestEngine) RegisterWorkflow(def *reactive.Definition) error
RegisterWorkflow registers a workflow definition.
func (*TestEngine) Registry ¶
func (e *TestEngine) Registry() *reactive.WorkflowRegistry
Registry returns the workflow registry.
func (*TestEngine) TriggerKV ¶
TriggerKV simulates a KV state change and triggers any matching rules.
func (*TestEngine) TriggerKVUpdate ¶
func (e *TestEngine) TriggerKVUpdate(ctx context.Context, key string, state any, revision uint64) error
TriggerKVUpdate simulates a KV update with revision check.
func (*TestEngine) TriggerMessage ¶
func (e *TestEngine) TriggerMessage(ctx context.Context, subject string, payload message.Payload, source string) error
TriggerMessage publishes a message to trigger subject-based rules.
func (*TestEngine) WaitForPhase ¶
func (e *TestEngine) WaitForPhase(key, expectedPhase string, timeout time.Duration) error
WaitForPhase waits for the execution to reach a specific phase.
func (*TestEngine) WaitForPublished ¶
func (e *TestEngine) WaitForPublished(subjectPattern string, timeout time.Duration) error
WaitForPublished waits for a message to be published to a subject pattern.
func (*TestEngine) WaitForStatus ¶
func (e *TestEngine) WaitForStatus(key string, expectedStatus reactive.ExecutionStatus, timeout time.Duration) error
WaitForStatus waits for the execution to reach a specific status.