Documentation
¶
Overview ¶
Package testutil provides testing utilities for StreamKit integration tests.
Overview ¶
The testutil package contains mock implementations, test data generators, and helper functions designed to simplify writing integration tests for StreamKit components. All utilities are framework-agnostic with ZERO semantic domain concepts (no EntityID, MAVLink, robotics, or SOSA/SSN knowledge).
Core Components ¶
Mock Implementations:
MockNATSClient - In-memory NATS client for testing pub/sub patterns:
- Thread-safe for concurrent use
- Stores all published messages for verification
- Supports subscription handlers
- No external NATS server required
MockKVStore - In-memory key-value store for testing storage:
- Thread-safe for concurrent use
- Simple Put/Get/Delete/Keys/Clear interface
- No external database required
MockComponent - Generic lifecycle component for testing:
- Tracks Start/Stop/Process call counts
- Thread-safe counters
- Configurable error injection
- Lifecycle state tracking
MockPort - Simple port abstraction for testing:
- Stores messages sent to port
- Thread-safe message list
Test Data Generators:
Provides generic test data for common formats:
- Generic JSON objects (no semantic meaning)
- CSV data
- HTTP request/response pairs
- Binary buffer data (small, medium, large)
Flow Configuration Builder:
FlowBuilder - Programmatic flow configuration:
- Fluent API for building flow configurations
- Method chaining for readability
- Sensible defaults
- Minimal boilerplate in tests
Test Helpers:
- WaitForMessage: Polls for message with timeout
- WaitForMessageCount: Waits for N messages
- AssertMessageReceived: Verifies message delivery
- AssertNoMessages: Verifies no messages sent
Design Principles ¶
Framework-Agnostic:
All test utilities avoid semantic domain knowledge. Instead of:
- ❌ CreateRoboticsEvent() - semantic concept
- ✅ CreateGenericJSON() - domain-agnostic
This ensures testutil can be used across different applications without coupling to specific domains.
Thread Safety:
All mock types are safe for concurrent use from multiple goroutines. This enables testing concurrent message flows without data races:
// Safe to use from multiple goroutines
go client.Publish(ctx, "subject1", data1)
go client.Publish(ctx, "subject2", data2)
go handler1(client.GetMessages("subject1"))
go handler2(client.GetMessages("subject2"))
Real Dependencies Preferred:
Use mocks ONLY when real dependencies are impractical:
- ✅ Use testcontainers for NATS (real behavior)
- ⚠️ Use MockNATSClient when testcontainers unavailable
- ❌ Don't mock when real dependencies are fast/easy
Usage Examples ¶
Basic MockNATSClient:
func TestPublishSubscribe(t *testing.T) {
client := testutil.NewMockNATSClient()
// Subscribe to subject (handler receives full *nats.Msg)
var received []byte
err := client.Subscribe(ctx, "test.subject", func(_ context.Context, msg *nats.Msg) {
received = msg.Data
})
require.NoError(t, err)
// Publish message
err = client.Publish(ctx, "test.subject", []byte("hello"))
require.NoError(t, err)
// Verify message received
assert.Equal(t, []byte("hello"), received)
}
Wait Helpers:
func TestMessageFlow(t *testing.T) {
client := testutil.NewMockNATSClient()
// Start async publisher
go func() {
time.Sleep(100 * time.Millisecond)
client.Publish(ctx, "events", []byte("data"))
}()
// Wait for message with timeout
msg := testutil.WaitForMessage(t, client, "events", time.Second)
assert.Equal(t, []byte("data"), msg)
}
FlowBuilder:
func TestFlowConfiguration(t *testing.T) {
// Build flow programmatically
flow := testutil.NewFlowBuilder("test-flow").
AddInput("udp-input", "udp", map[string]any{
"port": 14550,
}).
AddProcessor("filter", "json_filter", map[string]any{
"path": "$.type",
}).
AddOutput("ws-output", "websocket", map[string]any{
"port": 8080,
}).
Build()
// Use in test
flowJSON, _ := json.Marshal(flow)
// ... test flow configuration
}
MockKVStore:
func TestStorage(t *testing.T) {
kv := testutil.NewMockKVStore()
// Store values
err := kv.Put("key1", []byte("value1"))
require.NoError(t, err)
// Retrieve values
val, err := kv.Get("key1")
require.NoError(t, err)
assert.Equal(t, []byte("value1"), val)
// List keys
keys := kv.Keys()
assert.Contains(t, keys, "key1")
}
MockComponent:
func TestComponentLifecycle(t *testing.T) {
mock := testutil.NewMockComponent()
// Start component
err := mock.Start(ctx)
require.NoError(t, err)
assert.Equal(t, 1, mock.StartCalls)
assert.True(t, mock.Started)
// Process data
err = mock.Process("test-data")
require.NoError(t, err)
assert.Equal(t, 1, mock.ProcessCalls)
// Stop component
err = mock.Stop(ctx)
require.NoError(t, err)
assert.Equal(t, 1, mock.StopCalls)
assert.False(t, mock.Started)
}
Thread Safety Guarantees ¶
All mock types use sync.RWMutex for thread safety:
MockNATSClient:
- Publish() - Write lock for map updates, releases before calling handlers
- Subscribe() - Write lock
- GetMessages() - Read lock, returns copy of slice
- Clear()/ClearAll() - Write lock
MockKVStore:
- Put()/Delete() - Write lock
- Get() - Read lock, returns copy of value
- Keys() - Read lock
- Clear() - Write lock
MockComponent:
- Start()/Stop()/Process() - Write lock for counter updates
- Thread-safe call counters
Performance Considerations ¶
WaitForMessage Polling:
The WaitForMessage helper uses polling (10ms intervals) which adds latency:
- Each wait adds minimum 10ms to test execution
- With 100 tests using this → +1 second total
For unit tests, prefer direct assertions. For integration tests where async behavior is expected, the polling overhead is acceptable.
Mock vs Real Dependencies:
Decision matrix:
| Scenario | Use Mock | Use Real (testcontainers) | |-----------------------------|-------------------|---------------------------| | Unit test (component logic) | ✅ MockNATSClient | ❌ Overkill | | Integration test (E2E flow) | ❌ Incomplete | ✅ Real NATS | | CI/CD pipeline | ⚠️ Fast but fake | ✅ Slow but real | | Local development | ✅ Fast iteration | ⚠️ Docker overhead |
Test Data Guidelines ¶
Generic vs Semantic Data:
testutil provides GENERIC data - avoid creating semantic test data:
// ❌ BAD - semantic robotics data
robotEvent := map[string]any{
"entity_id": "robot-123",
"pose": {"x": 1.0, "y": 2.0},
}
// ✅ GOOD - generic JSON
genericEvent := testutil.TestJSONData["simple"]
If you need semantic data, create it in your test package, not in testutil.
Integration with Framework ¶
The testutil package integrates with StreamKit testing patterns:
Shared NATS Client Pattern:
For integration tests using real NATS:
func getSharedNATSClient(t *testing.T) *natsclient.Client {
// Use testcontainers - one NATS server per test run
// NOT testutil.MockNATSClient
}
Use MockNATSClient only for unit tests or when testcontainers unavailable.
Known Limitations ¶
- WaitForMessage uses polling (10ms) - adds latency to tests
- No support for NATS features like headers, request/reply patterns
- MockKVStore doesn't support transactions or atomic operations
- No metrics or observability for mock operations
- FlowBuilder doesn't validate flow correctness
These are design trade-offs - mocks prioritize simplicity over completeness.
See Also ¶
- component: Component interface and lifecycle
- natsclient: Real NATS client wrapper
- types: Shared domain types
Package testutil provides core test utilities for StreamKit with ZERO semantic concepts. NO EntityID, NO MAVLink, NO SOSA/SSN, NO semantic domain knowledge.
Index ¶
- Constants
- Variables
- func AssertMessageReceived(t *testing.T, client *MockNATSClient, subject string)
- func AssertNoMessages(t *testing.T, client *MockNATSClient, subject string)
- func EmptyFlow() map[string]any
- func FlowWithStorage() map[string]any
- func InvalidFlow() map[string]any
- func MultiComponentFlow() map[string]any
- func NewMockError(message, code string) error
- func SimpleFlow() map[string]any
- func TestFlow() map[string]any
- func TestFlowJSON() ([]byte, error)
- func TestFlowRawMessage() (json.RawMessage, error)
- func WaitForMessage(t *testing.T, client *MockNATSClient, subject string, timeout time.Duration) []byte
- func WaitForMessageCount(t *testing.T, client *MockNATSClient, subject string, count int, ...)
- type FlowBuilder
- func (fb *FlowBuilder) AddComponent(comp *FlowComponentConfig) *FlowBuilder
- func (fb *FlowBuilder) AddInput(name, protocol string, config map[string]any) *FlowBuilder
- func (fb *FlowBuilder) AddOutput(name, protocol string, config map[string]any) *FlowBuilder
- func (fb *FlowBuilder) AddProcessor(name, protocol string, config map[string]any) *FlowBuilder
- func (fb *FlowBuilder) AddStorage(name, protocol string, config map[string]any) *FlowBuilder
- func (fb *FlowBuilder) Build() map[string]any
- func (fb *FlowBuilder) BuildJSON() ([]byte, error)
- type FlowComponentConfig
- type GenericEvent
- type GenericMessage
- type MockComponent
- type MockConfig
- type MockError
- type MockKVStore
- type MockNATSClient
- func (c *MockNATSClient) Clear(subject string)
- func (c *MockNATSClient) ClearAll()
- func (c *MockNATSClient) Close() error
- func (c *MockNATSClient) GetMessageCount(subject string) int
- func (c *MockNATSClient) GetMessages(subject string) [][]byte
- func (c *MockNATSClient) GetMessagesAsInterface(subject string) []any
- func (c *MockNATSClient) IsClosed() bool
- func (c *MockNATSClient) Publish(ctx context.Context, subject string, data []byte) error
- func (c *MockNATSClient) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) error
- type MockPort
Constants ¶
const TestCSV = `` /* 151-byte string literal not displayed */
TestCSV is a generic CSV dataset for testing CSV parsing.
const TestCSVWithHeaders = `id,value,count,enabled
1,foo,42,true
2,bar,43,false
3,baz,44,true
4,qux,45,false
5,quux,46,true`
TestCSVWithHeaders is CSV data with headers for testing.
Variables ¶
var ( ErrMockFailed = errors.New("mock operation failed") ErrMockTimeout = errors.New("mock operation timed out") ErrMockNotFound = errors.New("mock resource not found") ErrMockInvalid = errors.New("mock invalid input") ErrMockConnection = errors.New("mock connection error") )
Common test errors
var TestBinaryData = [][]byte{
{0x01, 0x02, 0x03, 0x04, 0x05},
{0x0A, 0x0B, 0x0C, 0x0D, 0x0E},
{0xFF, 0xFE, 0xFD, 0xFC, 0xFB},
}
TestBinaryData contains test binary data patterns.
var TestBufferData = map[string][]byte{ "small": []byte("small data"), "medium": []byte(string(make([]byte, 1024))), "large": []byte(string(make([]byte, 10240))), }
TestBufferData contains data of various sizes for buffer testing.
var TestErrors = []string{
"connection timeout",
"invalid input format",
"resource not found",
"permission denied",
"internal server error",
}
TestErrors contains generic error messages for testing error handling.
var TestHTTPRequests = []map[string]any{
{
"method": "GET",
"path": "/api/v1/users",
"status": 200,
"timestamp": 1234567890,
},
{
"method": "POST",
"path": "/api/v1/users",
"status": 201,
"timestamp": 1234567891,
},
{
"method": "PUT",
"path": "/api/v1/users/123",
"status": 200,
"timestamp": 1234567892,
},
{
"method": "DELETE",
"path": "/api/v1/users/123",
"status": 204,
"timestamp": 1234567893,
},
}
TestHTTPRequests contains generic HTTP request data for testing.
var TestJSONObjects = []map[string]any{
{
"id": 1,
"name": "Alice",
"age": 30,
"city": "NYC",
"timestamp": 1234567890,
},
{
"id": 2,
"name": "Bob",
"age": 25,
"city": "LA",
"timestamp": 1234567891,
},
{
"id": 3,
"name": "Charlie",
"age": 35,
"city": "Chicago",
"timestamp": 1234567892,
},
}
TestJSONObjects contains generic JSON objects for testing.
var TestMessages = []string{
`{"id": 1, "value": "foo", "timestamp": 1234567890, "count": 42}`,
`{"id": 2, "value": "bar", "timestamp": 1234567891, "count": 43}`,
`{"id": 3, "value": "baz", "timestamp": 1234567892, "count": 44}`,
`{"id": 4, "value": "qux", "timestamp": 1234567893, "count": 45}`,
`{"id": 5, "value": "quux", "timestamp": 1234567894, "count": 46}`,
}
TestMessages contains generic JSON messages for testing (core data only).
var TestMetrics = []map[string]any{
{
"metric": "cpu_usage",
"value": 45.2,
"unit": "percent",
"timestamp": 1234567890,
},
{
"metric": "memory_usage",
"value": 2048,
"unit": "MB",
"timestamp": 1234567891,
},
{
"metric": "disk_usage",
"value": 75.5,
"unit": "percent",
"timestamp": 1234567892,
},
}
TestMetrics contains generic metric data for testing.
var TestPlainText = []string{
"This is a test message",
"Another test message",
"Yet another test message",
"One more test message",
"Final test message",
}
TestPlainText contains plain text data for testing.
var TestTimestamps = []any{ 1234567890, "2024-01-15T10:30:00Z", "2024-01-15 10:30:00", "1234567890000", float64(1234567890.123), }
TestTimestamps contains various timestamp formats for testing.
var TestUDPPackets = [][]byte{ []byte("UDP packet 1: Hello World"), []byte("UDP packet 2: Test Data"), []byte("UDP packet 3: Sample Message"), []byte("UDP packet 4: More Test Data"), []byte("UDP packet 5: Final Packet"), }
TestUDPPackets contains generic UDP packet data for testing.
var TestWebSocketMessages = []string{
`{"type":"ping","timestamp":1234567890}`,
`{"type":"data","value":"test"}`,
`{"type":"status","code":200}`,
`{"type":"error","message":"test error"}`,
`{"type":"close","reason":"done"}`,
}
TestWebSocketMessages contains generic WebSocket message data.
Functions ¶
func AssertMessageReceived ¶
func AssertMessageReceived(t *testing.T, client *MockNATSClient, subject string)
AssertMessageReceived checks that a message was received on a subject.
func AssertNoMessages ¶
func AssertNoMessages(t *testing.T, client *MockNATSClient, subject string)
AssertNoMessages checks that no messages were received on a subject.
func FlowWithStorage ¶
FlowWithStorage returns a flow that includes a storage component.
func InvalidFlow ¶
InvalidFlow returns an intentionally invalid flow for error testing.
func MultiComponentFlow ¶
MultiComponentFlow returns a flow with multiple components.
func NewMockError ¶
NewMockError creates a new mock error.
func SimpleFlow ¶
SimpleFlow returns a minimal flow for basic testing.
func TestFlow ¶
TestFlow returns a generic core flow definition for testing (no semantic concepts). This is a vanilla input -> processor -> output flow with no domain knowledge.
func TestFlowJSON ¶
TestFlowJSON returns a test flow as JSON bytes.
func TestFlowRawMessage ¶
func TestFlowRawMessage() (json.RawMessage, error)
TestFlowRawMessage returns a test flow as json.RawMessage.
func WaitForMessage ¶
func WaitForMessage(t *testing.T, client *MockNATSClient, subject string, timeout time.Duration) []byte
WaitForMessage is a test helper that waits for a message on a subject (with timeout).
func WaitForMessageCount ¶
func WaitForMessageCount(t *testing.T, client *MockNATSClient, subject string, count int, timeout time.Duration)
WaitForMessageCount waits for a specific number of messages (with timeout).
Types ¶
type FlowBuilder ¶
type FlowBuilder struct {
// contains filtered or unexported fields
}
FlowBuilder is a helper for building test flows programmatically.
func NewFlowBuilder ¶
func NewFlowBuilder(name string) *FlowBuilder
NewFlowBuilder creates a new flow builder.
func (*FlowBuilder) AddComponent ¶
func (fb *FlowBuilder) AddComponent(comp *FlowComponentConfig) *FlowBuilder
AddComponent adds a component to the flow.
func (*FlowBuilder) AddInput ¶
func (fb *FlowBuilder) AddInput(name, protocol string, config map[string]any) *FlowBuilder
AddInput adds an input component.
func (*FlowBuilder) AddOutput ¶
func (fb *FlowBuilder) AddOutput(name, protocol string, config map[string]any) *FlowBuilder
AddOutput adds an output component.
func (*FlowBuilder) AddProcessor ¶
func (fb *FlowBuilder) AddProcessor(name, protocol string, config map[string]any) *FlowBuilder
AddProcessor adds a processor component.
func (*FlowBuilder) AddStorage ¶
func (fb *FlowBuilder) AddStorage(name, protocol string, config map[string]any) *FlowBuilder
AddStorage adds a storage component.
func (*FlowBuilder) Build ¶
func (fb *FlowBuilder) Build() map[string]any
Build builds the flow and returns it as a map.
func (*FlowBuilder) BuildJSON ¶
func (fb *FlowBuilder) BuildJSON() ([]byte, error)
BuildJSON builds the flow and returns it as JSON.
type FlowComponentConfig ¶
type FlowComponentConfig struct {
Name string `json:"name"`
Type string `json:"type"`
Protocol string `json:"protocol"`
Config map[string]any `json:"config,omitempty"`
Enabled bool `json:"enabled"`
}
FlowComponentConfig represents a generic component configuration.
func NewFlowComponentConfig ¶
func NewFlowComponentConfig(name, compType, protocol string) *FlowComponentConfig
NewFlowComponentConfig creates a new flow component config.
func (*FlowComponentConfig) ToJSON ¶
func (c *FlowComponentConfig) ToJSON() ([]byte, error)
ToJSON converts the component config to JSON.
type GenericEvent ¶
type GenericEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
Data map[string]any `json:"data"`
Timestamp int64 `json:"timestamp"`
}
GenericEvent is a core generic event structure for testing.
func NewGenericEvent ¶
func NewGenericEvent(id, eventType string, timestamp int64) *GenericEvent
NewGenericEvent creates a new generic test event.
type GenericMessage ¶
type GenericMessage struct {
ID int `json:"id"`
Type string `json:"type"`
Value string `json:"value"`
Timestamp int64 `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
GenericMessage is a core generic message structure (no domain concepts).
func NewGenericMessage ¶
func NewGenericMessage(id int, msgType, value string, timestamp int64) *GenericMessage
NewGenericMessage creates a new generic test message.
type MockComponent ¶
type MockComponent struct {
// Lifecycle control
StartFunc func(ctx context.Context) error
StopFunc func(ctx context.Context) error
// Processing
ProcessFunc func(data any) (any, error)
// State tracking
Started bool
Stopped bool
Enabled bool
// Call counts for verification
StartCalls int
StopCalls int
ProcessCalls int
// contains filtered or unexported fields
}
MockComponent is a generic component for testing that implements basic lifecycle. This is core infrastructure - no domain concepts.
func NewMockComponent ¶
func NewMockComponent() *MockComponent
NewMockComponent creates a new mock component with default no-op implementations.
func (*MockComponent) GetInfo ¶
func (m *MockComponent) GetInfo() map[string]any
GetInfo returns mock component info (core metadata only).
func (*MockComponent) Process ¶
func (m *MockComponent) Process(data any) (any, error)
Process processes data through the mock component.
type MockConfig ¶
type MockConfig struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
Port int `json:"port"`
Host string `json:"host"`
Timeout int `json:"timeout"`
BufferSize int `json:"buffer_size"`
Metadata map[string]any `json:"metadata"`
}
MockConfig represents a generic configuration for testing.
func NewMockConfig ¶
func NewMockConfig() *MockConfig
NewMockConfig creates a mock config with sensible defaults.
func (*MockConfig) ToJSON ¶
func (c *MockConfig) ToJSON() ([]byte, error)
ToJSON converts the mock config to JSON.
func (*MockConfig) ToRawMessage ¶
func (c *MockConfig) ToRawMessage() (json.RawMessage, error)
ToRawMessage converts the mock config to json.RawMessage.
type MockKVStore ¶
type MockKVStore struct {
// contains filtered or unexported fields
}
MockKVStore is a simple in-memory key-value store for testing. Thread-safe for concurrent use from multiple goroutines.
func NewMockKVStore ¶
func NewMockKVStore() *MockKVStore
NewMockKVStore creates a new mock KV store.
type MockNATSClient ¶
type MockNATSClient struct {
// contains filtered or unexported fields
}
MockNATSClient is a simple in-memory NATS client for testing (core message passing). Matches the natsclient.Client interface for Subscribe/Publish methods. Thread-safe for concurrent use from multiple goroutines.
func NewMockNATSClient ¶
func NewMockNATSClient() *MockNATSClient
NewMockNATSClient creates a new mock NATS client.
func (*MockNATSClient) Clear ¶
func (c *MockNATSClient) Clear(subject string)
Clear clears all messages from a subject.
func (*MockNATSClient) ClearAll ¶
func (c *MockNATSClient) ClearAll()
ClearAll clears all messages from all subjects.
func (*MockNATSClient) GetMessageCount ¶
func (c *MockNATSClient) GetMessageCount(subject string) int
GetMessageCount returns the number of messages on a subject.
func (*MockNATSClient) GetMessages ¶
func (c *MockNATSClient) GetMessages(subject string) [][]byte
GetMessages returns all messages for a subject as [][]byte.
func (*MockNATSClient) GetMessagesAsInterface ¶
func (c *MockNATSClient) GetMessagesAsInterface(subject string) []any
GetMessagesAsInterface returns all messages for a subject (for backward compatibility).
func (*MockNATSClient) IsClosed ¶
func (c *MockNATSClient) IsClosed() bool
IsClosed returns whether the client is closed.
func (*MockNATSClient) Publish ¶
Publish publishes a message to a subject (matches natsclient.Client signature).
func (*MockNATSClient) Subscribe ¶
func (c *MockNATSClient) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) error
Subscribe creates a subscription to a subject (matches natsclient.Client signature). Handler receives full *nats.Msg to access Subject, Data, Headers, etc.
type MockPort ¶
MockPort represents a generic NATS port for testing (core message passing).
func (*MockPort) GetMessages ¶
GetMessages returns all published messages.