testutil

package
v1.0.0-alpha.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package testutil provides testing utilities for the reactive workflow engine.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoMessages = errors.New("no messages found")
	ErrTimeout    = errors.New("timeout waiting for message")
)

Common errors for testutil package.

Functions

This section is empty.

Types

type InMemoryBus

type InMemoryBus struct {
	// contains filtered or unexported fields
}

InMemoryBus captures published messages for testing.

func NewInMemoryBus

func NewInMemoryBus() *InMemoryBus

NewInMemoryBus creates a new in-memory message bus.

func (*InMemoryBus) Clear

func (b *InMemoryBus) Clear()

Clear removes all messages.

func (*InMemoryBus) Count

func (b *InMemoryBus) Count() int

Count returns the total number of messages.

func (*InMemoryBus) CountForSubject

func (b *InMemoryBus) CountForSubject(pattern string) int

CountForSubject returns the number of messages matching the pattern.

func (*InMemoryBus) GetPayload

func (b *InMemoryBus) GetPayload(pattern string, v any) error

GetPayload unmarshals and returns the payload from the last matching message.

func (*InMemoryBus) HasMessage

func (b *InMemoryBus) HasMessage(pattern string) bool

HasMessage checks if any message matches the subject pattern.

func (*InMemoryBus) HasMessageWithType

func (b *InMemoryBus) HasMessageWithType(pattern string, msgType message.Type) bool

HasMessageWithType checks if any message matches the subject and type.

func (*InMemoryBus) LastMessage

func (b *InMemoryBus) LastMessage() *PublishedMessage

LastMessage returns the most recent message, or nil if none.

func (*InMemoryBus) LastMessageForSubject

func (b *InMemoryBus) LastMessageForSubject(pattern string) *PublishedMessage

LastMessageForSubject returns the most recent message matching the pattern.

func (*InMemoryBus) Messages

func (b *InMemoryBus) Messages() []PublishedMessage

Messages returns all published messages.

func (*InMemoryBus) MessagesForSubject

func (b *InMemoryBus) MessagesForSubject(pattern string) []PublishedMessage

MessagesForSubject returns messages matching the subject pattern.

func (*InMemoryBus) Publish

func (b *InMemoryBus) Publish(_ context.Context, subject string, data []byte) error

Publish stores a message and notifies handlers.

func (*InMemoryBus) Subscribe

func (b *InMemoryBus) Subscribe(pattern string, handler func(context.Context, []byte))

Subscribe registers a handler for messages matching the pattern.

func (*InMemoryBus) WaitForCount

func (b *InMemoryBus) WaitForCount(pattern string, n int, timeout time.Duration) error

WaitForCount waits for at least n messages matching the pattern.

func (*InMemoryBus) WaitForMessage

func (b *InMemoryBus) WaitForMessage(pattern string, timeout time.Duration) (*PublishedMessage, error)

WaitForMessage waits for a message matching the pattern within the timeout.

type InMemoryKV

type InMemoryKV struct {
	// contains filtered or unexported fields
}

InMemoryKV implements a mock KV store for testing. It supports basic CRUD operations and watch functionality.

func NewInMemoryKV

func NewInMemoryKV(name string) *InMemoryKV

NewInMemoryKV creates a new in-memory KV store.

func (*InMemoryKV) Bucket

func (kv *InMemoryKV) Bucket() string

Bucket returns the bucket name.

func (*InMemoryKV) Clear

func (kv *InMemoryKV) Clear()

Clear removes all data.

func (*InMemoryKV) Create

func (kv *InMemoryKV) Create(_ context.Context, key string, value []byte) (uint64, error)

Create stores a value only if the key doesn't exist.

func (*InMemoryKV) Delete

func (kv *InMemoryKV) Delete(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error

Delete removes a key.

func (*InMemoryKV) Get

Get retrieves a value by key.

func (*InMemoryKV) GetData

func (kv *InMemoryKV) GetData() map[string][]byte

GetData returns a copy of all data for inspection.

func (*InMemoryKV) GetEntry

func (kv *InMemoryKV) GetEntry(key string) ([]byte, uint64, bool)

GetEntry returns a specific entry for inspection.

func (*InMemoryKV) GetJSON

func (kv *InMemoryKV) GetJSON(ctx context.Context, key string, v any) error

GetJSON is a helper that retrieves and unmarshals a value.

func (*InMemoryKV) History

History returns the history for a key (simplified - returns current only).

func (*InMemoryKV) Keys

func (kv *InMemoryKV) Keys(_ context.Context, _ ...jetstream.WatchOpt) ([]string, error)

Keys returns all keys.

func (*InMemoryKV) ListKeys

ListKeys returns a channel of keys (for iteration).

func (*InMemoryKV) Purge

func (kv *InMemoryKV) Purge(_ context.Context, key string, _ ...jetstream.KVDeleteOpt) error

Purge removes all values.

func (*InMemoryKV) Put

func (kv *InMemoryKV) Put(_ context.Context, key string, value []byte) (uint64, error)

Put stores a value.

func (*InMemoryKV) PutJSON

func (kv *InMemoryKV) PutJSON(ctx context.Context, key string, v any) (uint64, error)

PutJSON is a helper that marshals and stores a value.

func (*InMemoryKV) Status

Status returns the bucket status.

func (*InMemoryKV) Update

func (kv *InMemoryKV) Update(_ context.Context, key string, value []byte, revision uint64) (uint64, error)

Update stores a value only if the revision matches.

func (*InMemoryKV) Watch

Watch starts watching for changes to keys matching the pattern.

func (*InMemoryKV) WatchAll

WatchAll watches all keys.

type PublishedMessage

type PublishedMessage struct {
	Subject   string
	Data      []byte
	Timestamp time.Time
}

PublishedMessage represents a message that was published to the bus.

type TestEngine

type TestEngine struct {
	// contains filtered or unexported fields
}

TestEngine wraps a reactive workflow engine with test helpers.

func NewTestEngine

func NewTestEngine(t *testing.T) *TestEngine

NewTestEngine creates a new test engine with in-memory KV and bus.

func (*TestEngine) AssertIteration

func (e *TestEngine) AssertIteration(key string, expectedIteration int)

AssertIteration verifies the iteration count matches the expected value.

func (*TestEngine) AssertNoPublished

func (e *TestEngine) AssertNoPublished(subjectPattern string)

AssertNoPublished verifies that no messages were published to a subject pattern.

func (*TestEngine) AssertPayload

func (e *TestEngine) AssertPayload(subjectPattern string, assertFn func(t *testing.T, payload any))

AssertPayload verifies the payload of the last message to a subject pattern.

func (*TestEngine) AssertPhase

func (e *TestEngine) AssertPhase(key, expectedPhase string)

AssertPhase verifies the execution phase matches the expected value.

func (*TestEngine) AssertPublished

func (e *TestEngine) AssertPublished(subjectPattern string)

AssertPublished verifies that a message was published to a subject pattern.

func (*TestEngine) AssertPublishedCount

func (e *TestEngine) AssertPublishedCount(subjectPattern string, expectedCount int)

AssertPublishedCount verifies the number of messages published to a subject pattern.

func (*TestEngine) AssertPublishedWithType

func (e *TestEngine) AssertPublishedWithType(subjectPattern string, msgType message.Type)

AssertPublishedWithType verifies that a message with specific type was published.

func (*TestEngine) AssertState

func (e *TestEngine) AssertState(key string, assertFn func(t *testing.T, data []byte))

AssertState verifies the state using a custom assertion function.

func (*TestEngine) AssertStateAs

func (e *TestEngine) AssertStateAs(key string, state any, assertFn func(t *testing.T, state any))

AssertStateAs unmarshals state into the given type and runs assertions.

func (*TestEngine) AssertStatus

func (e *TestEngine) AssertStatus(key string, expectedStatus reactive.ExecutionStatus)

AssertStatus verifies the execution status matches the expected value.

func (*TestEngine) Bus

func (e *TestEngine) Bus() *InMemoryBus

Bus returns the in-memory message bus.

func (*TestEngine) Clear

func (e *TestEngine) Clear()

Clear resets the test engine state.

func (*TestEngine) GetState

func (e *TestEngine) GetState(key string) ([]byte, error)

GetState retrieves the current state for a key.

func (*TestEngine) GetStateAs

func (e *TestEngine) GetStateAs(key string, state any) error

GetStateAs retrieves and unmarshals the state into the given type.

func (*TestEngine) HandleCallback

func (e *TestEngine) HandleCallback(ctx context.Context, key string, result message.Payload, taskID string) error

HandleCallback simulates receiving an async callback result. The result must implement the message.Payload interface (have Schema() method).

func (*TestEngine) KV

func (e *TestEngine) KV() *InMemoryKV

KV returns the in-memory KV store.

func (*TestEngine) PrintMessages

func (e *TestEngine) PrintMessages()

PrintMessages prints all published messages for debugging.

func (*TestEngine) PrintState

func (e *TestEngine) PrintState(key string)

PrintState prints the current state for debugging.

func (*TestEngine) RegisterWorkflow

func (e *TestEngine) RegisterWorkflow(def *reactive.Definition) error

RegisterWorkflow registers a workflow definition.

func (*TestEngine) Registry

func (e *TestEngine) Registry() *reactive.WorkflowRegistry

Registry returns the workflow registry.

func (*TestEngine) TriggerKV

func (e *TestEngine) TriggerKV(ctx context.Context, key string, state any) error

TriggerKV simulates a KV state change and triggers any matching rules.

func (*TestEngine) TriggerKVUpdate

func (e *TestEngine) TriggerKVUpdate(ctx context.Context, key string, state any, revision uint64) error

TriggerKVUpdate simulates a KV update with revision check.

func (*TestEngine) TriggerMessage

func (e *TestEngine) TriggerMessage(ctx context.Context, subject string, payload message.Payload, source string) error

TriggerMessage publishes a message to trigger subject-based rules.

func (*TestEngine) WaitForPhase

func (e *TestEngine) WaitForPhase(key, expectedPhase string, timeout time.Duration) error

WaitForPhase waits for the execution to reach a specific phase.

func (*TestEngine) WaitForPublished

func (e *TestEngine) WaitForPublished(subjectPattern string, timeout time.Duration) error

WaitForPublished waits for a message to be published to a subject pattern.

func (*TestEngine) WaitForStatus

func (e *TestEngine) WaitForStatus(key string, expectedStatus reactive.ExecutionStatus, timeout time.Duration) error

WaitForStatus waits for the execution to reach a specific status.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL