testutil

package
v1.0.0-alpha.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package testutil provides testing utilities for StreamKit integration tests.

Overview

The testutil package contains mock implementations, test data generators, and helper functions designed to simplify writing integration tests for StreamKit components. All utilities are framework-agnostic with ZERO semantic domain concepts (no EntityID, MAVLink, robotics, or SOSA/SSN knowledge).

Core Components

Mock Implementations:

MockNATSClient - In-memory NATS client for testing pub/sub patterns:

  • Thread-safe for concurrent use
  • Stores all published messages for verification
  • Supports subscription handlers
  • No external NATS server required

MockKVStore - In-memory key-value store for testing storage:

  • Thread-safe for concurrent use
  • Simple Put/Get/Delete/Keys/Clear interface
  • No external database required

MockComponent - Generic lifecycle component for testing:

  • Tracks Start/Stop/Process call counts
  • Thread-safe counters
  • Configurable error injection
  • Lifecycle state tracking

MockPort - Simple port abstraction for testing:

  • Stores messages sent to port
  • Thread-safe message list

Test Data Generators:

Provides generic test data for common formats:

  • Generic JSON objects (no semantic meaning)
  • CSV data
  • HTTP request/response pairs
  • Binary buffer data (small, medium, large)

Flow Configuration Builder:

FlowBuilder - Programmatic flow configuration:

  • Fluent API for building flow configurations
  • Method chaining for readability
  • Sensible defaults
  • Minimal boilerplate in tests

Test Helpers:

  • WaitForMessage: Polls for message with timeout
  • WaitForMessageCount: Waits for N messages
  • AssertMessageReceived: Verifies message delivery
  • AssertNoMessages: Verifies no messages sent

Design Principles

Framework-Agnostic:

All test utilities avoid semantic domain knowledge. Instead of:

  • ❌ CreateRoboticsEvent() - semantic concept
  • ✅ CreateGenericJSON() - domain-agnostic

This ensures testutil can be used across different applications without coupling to specific domains.

Thread Safety:

All mock types are safe for concurrent use from multiple goroutines. This enables testing concurrent message flows without data races:

// Safe to use from multiple goroutines
go client.Publish(ctx, "subject1", data1)
go client.Publish(ctx, "subject2", data2)
go handler1(client.GetMessages("subject1"))
go handler2(client.GetMessages("subject2"))

Real Dependencies Preferred:

Use mocks ONLY when real dependencies are impractical:

  • ✅ Use testcontainers for NATS (real behavior)
  • ⚠️ Use MockNATSClient when testcontainers unavailable
  • ❌ Don't mock when real dependencies are fast/easy

Usage Examples

Basic MockNATSClient:

func TestPublishSubscribe(t *testing.T) {
    client := testutil.NewMockNATSClient()

    // Subscribe to subject (handler receives full *nats.Msg)
    var received []byte
    err := client.Subscribe(ctx, "test.subject", func(_ context.Context, msg *nats.Msg) {
        received = msg.Data
    })
    require.NoError(t, err)

    // Publish message
    err = client.Publish(ctx, "test.subject", []byte("hello"))
    require.NoError(t, err)

    // Verify message received
    assert.Equal(t, []byte("hello"), received)
}

Wait Helpers:

func TestMessageFlow(t *testing.T) {
    client := testutil.NewMockNATSClient()

    // Start async publisher
    go func() {
        time.Sleep(100 * time.Millisecond)
        client.Publish(ctx, "events", []byte("data"))
    }()

    // Wait for message with timeout
    msg := testutil.WaitForMessage(t, client, "events", time.Second)
    assert.Equal(t, []byte("data"), msg)
}

FlowBuilder:

func TestFlowConfiguration(t *testing.T) {
    // Build flow programmatically
    flow := testutil.NewFlowBuilder("test-flow").
        AddInput("udp-input", "udp", map[string]any{
            "port": 14550,
        }).
        AddProcessor("filter", "json_filter", map[string]any{
            "path": "$.type",
        }).
        AddOutput("ws-output", "websocket", map[string]any{
            "port": 8080,
        }).
        Build()

    // Use in test
    flowJSON, _ := json.Marshal(flow)
    // ... test flow configuration
}

MockKVStore:

func TestStorage(t *testing.T) {
    kv := testutil.NewMockKVStore()

    // Store values
    err := kv.Put("key1", []byte("value1"))
    require.NoError(t, err)

    // Retrieve values
    val, err := kv.Get("key1")
    require.NoError(t, err)
    assert.Equal(t, []byte("value1"), val)

    // List keys
    keys := kv.Keys()
    assert.Contains(t, keys, "key1")
}

MockComponent:

func TestComponentLifecycle(t *testing.T) {
    mock := testutil.NewMockComponent()

    // Start component
    err := mock.Start(ctx)
    require.NoError(t, err)
    assert.Equal(t, 1, mock.StartCalls)
    assert.True(t, mock.Started)

    // Process data
    err = mock.Process("test-data")
    require.NoError(t, err)
    assert.Equal(t, 1, mock.ProcessCalls)

    // Stop component
    err = mock.Stop(ctx)
    require.NoError(t, err)
    assert.Equal(t, 1, mock.StopCalls)
    assert.False(t, mock.Started)
}

Thread Safety Guarantees

All mock types use sync.RWMutex for thread safety:

MockNATSClient:

  • Publish() - Write lock for map updates, releases before calling handlers
  • Subscribe() - Write lock
  • GetMessages() - Read lock, returns copy of slice
  • Clear()/ClearAll() - Write lock

MockKVStore:

  • Put()/Delete() - Write lock
  • Get() - Read lock, returns copy of value
  • Keys() - Read lock
  • Clear() - Write lock

MockComponent:

  • Start()/Stop()/Process() - Write lock for counter updates
  • Thread-safe call counters

Performance Considerations

WaitForMessage Polling:

The WaitForMessage helper uses polling (10ms intervals) which adds latency:

  • Each wait adds minimum 10ms to test execution
  • With 100 tests using this → +1 second total

For unit tests, prefer direct assertions. For integration tests where async behavior is expected, the polling overhead is acceptable.

Mock vs Real Dependencies:

Decision matrix:

| Scenario                    | Use Mock          | Use Real (testcontainers) |
|-----------------------------|-------------------|---------------------------|
| Unit test (component logic) | ✅ MockNATSClient | ❌ Overkill               |
| Integration test (E2E flow) | ❌ Incomplete     | ✅ Real NATS              |
| CI/CD pipeline             | ⚠️ Fast but fake   | ✅ Slow but real          |
| Local development          | ✅ Fast iteration | ⚠️ Docker overhead         |

Test Data Guidelines

Generic vs Semantic Data:

testutil provides GENERIC data - avoid creating semantic test data:

// ❌ BAD - semantic robotics data
robotEvent := map[string]any{
    "entity_id": "robot-123",
    "pose": {"x": 1.0, "y": 2.0},
}

// ✅ GOOD - generic JSON
genericEvent := testutil.TestJSONData["simple"]

If you need semantic data, create it in your test package, not in testutil.

Integration with Framework

The testutil package integrates with StreamKit testing patterns:

Shared NATS Client Pattern:

For integration tests using real NATS:

func getSharedNATSClient(t *testing.T) *natsclient.Client {
    // Use testcontainers - one NATS server per test run
    // NOT testutil.MockNATSClient
}

Use MockNATSClient only for unit tests or when testcontainers unavailable.

Known Limitations

  1. WaitForMessage uses polling (10ms) - adds latency to tests
  2. No support for NATS features like headers, request/reply patterns
  3. MockKVStore doesn't support transactions or atomic operations
  4. No metrics or observability for mock operations
  5. FlowBuilder doesn't validate flow correctness

These are design trade-offs - mocks prioritize simplicity over completeness.

See Also

  • component: Component interface and lifecycle
  • natsclient: Real NATS client wrapper
  • types: Shared domain types

Package testutil provides core test utilities for StreamKit with ZERO semantic concepts. NO EntityID, NO MAVLink, NO SOSA/SSN, NO semantic domain knowledge.

Index

Constants

View Source
const TestCSV = `` /* 151-byte string literal not displayed */

TestCSV is a generic CSV dataset for testing CSV parsing.

View Source
const TestCSVWithHeaders = `id,value,count,enabled
1,foo,42,true
2,bar,43,false
3,baz,44,true
4,qux,45,false
5,quux,46,true`

TestCSVWithHeaders is CSV data with headers for testing.

Variables

View Source
var (
	ErrMockFailed     = errors.New("mock operation failed")
	ErrMockTimeout    = errors.New("mock operation timed out")
	ErrMockNotFound   = errors.New("mock resource not found")
	ErrMockInvalid    = errors.New("mock invalid input")
	ErrMockConnection = errors.New("mock connection error")
)

Common test errors

View Source
var TestBinaryData = [][]byte{
	{0x01, 0x02, 0x03, 0x04, 0x05},
	{0x0A, 0x0B, 0x0C, 0x0D, 0x0E},
	{0xFF, 0xFE, 0xFD, 0xFC, 0xFB},
}

TestBinaryData contains test binary data patterns.

View Source
var TestBufferData = map[string][]byte{
	"small":  []byte("small data"),
	"medium": []byte(string(make([]byte, 1024))),
	"large":  []byte(string(make([]byte, 10240))),
}

TestBufferData contains data of various sizes for buffer testing.

View Source
var TestErrors = []string{
	"connection timeout",
	"invalid input format",
	"resource not found",
	"permission denied",
	"internal server error",
}

TestErrors contains generic error messages for testing error handling.

View Source
var TestHTTPRequests = []map[string]any{
	{
		"method":    "GET",
		"path":      "/api/v1/users",
		"status":    200,
		"timestamp": 1234567890,
	},
	{
		"method":    "POST",
		"path":      "/api/v1/users",
		"status":    201,
		"timestamp": 1234567891,
	},
	{
		"method":    "PUT",
		"path":      "/api/v1/users/123",
		"status":    200,
		"timestamp": 1234567892,
	},
	{
		"method":    "DELETE",
		"path":      "/api/v1/users/123",
		"status":    204,
		"timestamp": 1234567893,
	},
}

TestHTTPRequests contains generic HTTP request data for testing.

View Source
var TestJSONObjects = []map[string]any{
	{
		"id":        1,
		"name":      "Alice",
		"age":       30,
		"city":      "NYC",
		"timestamp": 1234567890,
	},
	{
		"id":        2,
		"name":      "Bob",
		"age":       25,
		"city":      "LA",
		"timestamp": 1234567891,
	},
	{
		"id":        3,
		"name":      "Charlie",
		"age":       35,
		"city":      "Chicago",
		"timestamp": 1234567892,
	},
}

TestJSONObjects contains generic JSON objects for testing.

View Source
var TestMessages = []string{
	`{"id": 1, "value": "foo", "timestamp": 1234567890, "count": 42}`,
	`{"id": 2, "value": "bar", "timestamp": 1234567891, "count": 43}`,
	`{"id": 3, "value": "baz", "timestamp": 1234567892, "count": 44}`,
	`{"id": 4, "value": "qux", "timestamp": 1234567893, "count": 45}`,
	`{"id": 5, "value": "quux", "timestamp": 1234567894, "count": 46}`,
}

TestMessages contains generic JSON messages for testing (core data only).

View Source
var TestMetrics = []map[string]any{
	{
		"metric":    "cpu_usage",
		"value":     45.2,
		"unit":      "percent",
		"timestamp": 1234567890,
	},
	{
		"metric":    "memory_usage",
		"value":     2048,
		"unit":      "MB",
		"timestamp": 1234567891,
	},
	{
		"metric":    "disk_usage",
		"value":     75.5,
		"unit":      "percent",
		"timestamp": 1234567892,
	},
}

TestMetrics contains generic metric data for testing.

View Source
var TestPlainText = []string{
	"This is a test message",
	"Another test message",
	"Yet another test message",
	"One more test message",
	"Final test message",
}

TestPlainText contains plain text data for testing.

View Source
var TestTimestamps = []any{
	1234567890,
	"2024-01-15T10:30:00Z",
	"2024-01-15 10:30:00",
	"1234567890000",
	float64(1234567890.123),
}

TestTimestamps contains various timestamp formats for testing.

View Source
var TestUDPPackets = [][]byte{
	[]byte("UDP packet 1: Hello World"),
	[]byte("UDP packet 2: Test Data"),
	[]byte("UDP packet 3: Sample Message"),
	[]byte("UDP packet 4: More Test Data"),
	[]byte("UDP packet 5: Final Packet"),
}

TestUDPPackets contains generic UDP packet data for testing.

View Source
var TestWebSocketMessages = []string{
	`{"type":"ping","timestamp":1234567890}`,
	`{"type":"data","value":"test"}`,
	`{"type":"status","code":200}`,
	`{"type":"error","message":"test error"}`,
	`{"type":"close","reason":"done"}`,
}

TestWebSocketMessages contains generic WebSocket message data.

Functions

func AssertMessageReceived

func AssertMessageReceived(t *testing.T, client *MockNATSClient, subject string)

AssertMessageReceived checks that a message was received on a subject.

func AssertNoMessages

func AssertNoMessages(t *testing.T, client *MockNATSClient, subject string)

AssertNoMessages checks that no messages were received on a subject.

func EmptyFlow

func EmptyFlow() map[string]any

EmptyFlow returns a flow with no components.

func FlowWithStorage

func FlowWithStorage() map[string]any

FlowWithStorage returns a flow that includes a storage component.

func InvalidFlow

func InvalidFlow() map[string]any

InvalidFlow returns an intentionally invalid flow for error testing.

func MultiComponentFlow

func MultiComponentFlow() map[string]any

MultiComponentFlow returns a flow with multiple components.

func NewMockError

func NewMockError(message, code string) error

NewMockError creates a new mock error.

func SimpleFlow

func SimpleFlow() map[string]any

SimpleFlow returns a minimal flow for basic testing.

func TestFlow

func TestFlow() map[string]any

TestFlow returns a generic core flow definition for testing (no semantic concepts). This is a vanilla input -> processor -> output flow with no domain knowledge.

func TestFlowJSON

func TestFlowJSON() ([]byte, error)

TestFlowJSON returns a test flow as JSON bytes.

func TestFlowRawMessage

func TestFlowRawMessage() (json.RawMessage, error)

TestFlowRawMessage returns a test flow as json.RawMessage.

func WaitForMessage

func WaitForMessage(t *testing.T, client *MockNATSClient, subject string, timeout time.Duration) []byte

WaitForMessage is a test helper that waits for a message on a subject (with timeout).

func WaitForMessageCount

func WaitForMessageCount(t *testing.T, client *MockNATSClient, subject string, count int, timeout time.Duration)

WaitForMessageCount waits for a specific number of messages (with timeout).

Types

type FlowBuilder

type FlowBuilder struct {
	// contains filtered or unexported fields
}

FlowBuilder is a helper for building test flows programmatically.

func NewFlowBuilder

func NewFlowBuilder(name string) *FlowBuilder

NewFlowBuilder creates a new flow builder.

func (*FlowBuilder) AddComponent

func (fb *FlowBuilder) AddComponent(comp *FlowComponentConfig) *FlowBuilder

AddComponent adds a component to the flow.

func (*FlowBuilder) AddInput

func (fb *FlowBuilder) AddInput(name, protocol string, config map[string]any) *FlowBuilder

AddInput adds an input component.

func (*FlowBuilder) AddOutput

func (fb *FlowBuilder) AddOutput(name, protocol string, config map[string]any) *FlowBuilder

AddOutput adds an output component.

func (*FlowBuilder) AddProcessor

func (fb *FlowBuilder) AddProcessor(name, protocol string, config map[string]any) *FlowBuilder

AddProcessor adds a processor component.

func (*FlowBuilder) AddStorage

func (fb *FlowBuilder) AddStorage(name, protocol string, config map[string]any) *FlowBuilder

AddStorage adds a storage component.

func (*FlowBuilder) Build

func (fb *FlowBuilder) Build() map[string]any

Build builds the flow and returns it as a map.

func (*FlowBuilder) BuildJSON

func (fb *FlowBuilder) BuildJSON() ([]byte, error)

BuildJSON builds the flow and returns it as JSON.

type FlowComponentConfig

type FlowComponentConfig struct {
	Name     string         `json:"name"`
	Type     string         `json:"type"`
	Protocol string         `json:"protocol"`
	Config   map[string]any `json:"config,omitempty"`
	Enabled  bool           `json:"enabled"`
}

FlowComponentConfig represents a generic component configuration.

func NewFlowComponentConfig

func NewFlowComponentConfig(name, compType, protocol string) *FlowComponentConfig

NewFlowComponentConfig creates a new flow component config.

func (*FlowComponentConfig) ToJSON

func (c *FlowComponentConfig) ToJSON() ([]byte, error)

ToJSON converts the component config to JSON.

type GenericEvent

type GenericEvent struct {
	EventID   string         `json:"event_id"`
	EventType string         `json:"event_type"`
	Data      map[string]any `json:"data"`
	Timestamp int64          `json:"timestamp"`
}

GenericEvent is a core generic event structure for testing.

func NewGenericEvent

func NewGenericEvent(id, eventType string, timestamp int64) *GenericEvent

NewGenericEvent creates a new generic test event.

type GenericMessage

type GenericMessage struct {
	ID        int            `json:"id"`
	Type      string         `json:"type"`
	Value     string         `json:"value"`
	Timestamp int64          `json:"timestamp"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

GenericMessage is a core generic message structure (no domain concepts).

func NewGenericMessage

func NewGenericMessage(id int, msgType, value string, timestamp int64) *GenericMessage

NewGenericMessage creates a new generic test message.

type MockComponent

type MockComponent struct {

	// Lifecycle control
	StartFunc func(ctx context.Context) error
	StopFunc  func(ctx context.Context) error

	// Processing
	ProcessFunc func(data any) (any, error)

	// State tracking
	Started bool
	Stopped bool
	Enabled bool

	// Call counts for verification
	StartCalls   int
	StopCalls    int
	ProcessCalls int
	// contains filtered or unexported fields
}

MockComponent is a generic component for testing that implements basic lifecycle. This is core infrastructure - no domain concepts.

func NewMockComponent

func NewMockComponent() *MockComponent

NewMockComponent creates a new mock component with default no-op implementations.

func (*MockComponent) GetInfo

func (m *MockComponent) GetInfo() map[string]any

GetInfo returns mock component info (core metadata only).

func (*MockComponent) Process

func (m *MockComponent) Process(data any) (any, error)

Process processes data through the mock component.

func (*MockComponent) Start

func (m *MockComponent) Start(ctx context.Context) error

Start starts the mock component.

func (*MockComponent) Stop

func (m *MockComponent) Stop(ctx context.Context) error

Stop stops the mock component.

type MockConfig

type MockConfig struct {
	Name       string         `json:"name"`
	Enabled    bool           `json:"enabled"`
	Port       int            `json:"port"`
	Host       string         `json:"host"`
	Timeout    int            `json:"timeout"`
	BufferSize int            `json:"buffer_size"`
	Metadata   map[string]any `json:"metadata"`
}

MockConfig represents a generic configuration for testing.

func NewMockConfig

func NewMockConfig() *MockConfig

NewMockConfig creates a mock config with sensible defaults.

func (*MockConfig) ToJSON

func (c *MockConfig) ToJSON() ([]byte, error)

ToJSON converts the mock config to JSON.

func (*MockConfig) ToRawMessage

func (c *MockConfig) ToRawMessage() (json.RawMessage, error)

ToRawMessage converts the mock config to json.RawMessage.

type MockError

type MockError struct {
	Message string
	Code    string
}

MockError is a generic error for testing error paths.

func (*MockError) Error

func (e *MockError) Error() string

type MockKVStore

type MockKVStore struct {
	// contains filtered or unexported fields
}

MockKVStore is a simple in-memory key-value store for testing. Thread-safe for concurrent use from multiple goroutines.

func NewMockKVStore

func NewMockKVStore() *MockKVStore

NewMockKVStore creates a new mock KV store.

func (*MockKVStore) Clear

func (kv *MockKVStore) Clear()

Clear removes all keys.

func (*MockKVStore) Delete

func (kv *MockKVStore) Delete(key string) error

Delete removes a key.

func (*MockKVStore) Get

func (kv *MockKVStore) Get(key string) ([]byte, error)

Get retrieves a value.

func (*MockKVStore) Keys

func (kv *MockKVStore) Keys() []string

Keys returns all keys.

func (*MockKVStore) Put

func (kv *MockKVStore) Put(key string, value []byte) error

Put stores a value.

type MockNATSClient

type MockNATSClient struct {
	// contains filtered or unexported fields
}

MockNATSClient is a simple in-memory NATS client for testing (core message passing). Matches the natsclient.Client interface for Subscribe/Publish methods. Thread-safe for concurrent use from multiple goroutines.

func NewMockNATSClient

func NewMockNATSClient() *MockNATSClient

NewMockNATSClient creates a new mock NATS client.

func (*MockNATSClient) Clear

func (c *MockNATSClient) Clear(subject string)

Clear clears all messages from a subject.

func (*MockNATSClient) ClearAll

func (c *MockNATSClient) ClearAll()

ClearAll clears all messages from all subjects.

func (*MockNATSClient) Close

func (c *MockNATSClient) Close() error

Close closes the mock client.

func (*MockNATSClient) GetMessageCount

func (c *MockNATSClient) GetMessageCount(subject string) int

GetMessageCount returns the number of messages on a subject.

func (*MockNATSClient) GetMessages

func (c *MockNATSClient) GetMessages(subject string) [][]byte

GetMessages returns all messages for a subject as [][]byte.

func (*MockNATSClient) GetMessagesAsInterface

func (c *MockNATSClient) GetMessagesAsInterface(subject string) []any

GetMessagesAsInterface returns all messages for a subject (for backward compatibility).

func (*MockNATSClient) IsClosed

func (c *MockNATSClient) IsClosed() bool

IsClosed returns whether the client is closed.

func (*MockNATSClient) Publish

func (c *MockNATSClient) Publish(ctx context.Context, subject string, data []byte) error

Publish publishes a message to a subject (matches natsclient.Client signature).

func (*MockNATSClient) Subscribe

func (c *MockNATSClient) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) error

Subscribe creates a subscription to a subject (matches natsclient.Client signature). Handler receives full *nats.Msg to access Subject, Data, Headers, etc.

type MockPort

type MockPort struct {
	Subject  string
	Messages []any
	// contains filtered or unexported fields
}

MockPort represents a generic NATS port for testing (core message passing).

func NewMockPort

func NewMockPort(subject string) *MockPort

NewMockPort creates a new mock port.

func (*MockPort) Clear

func (p *MockPort) Clear()

Clear clears all messages.

func (*MockPort) GetMessages

func (p *MockPort) GetMessages() []any

GetMessages returns all published messages.

func (*MockPort) Publish

func (p *MockPort) Publish(msg any) error

Publish publishes a message to the mock port.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL