Documentation
¶
Overview ¶
Package spec defines the core interfaces and abstractions for WombatWisdom components.
WombatWisdom implements a component-based data processing framework with two main patterns:
Standard Pattern ¶
Traditional data processing components:
- Input: Reads data from external sources
- Processor: Transforms data between inputs and outputs
- Output: Writes data to external destinations
Trigger-Retrieval Pattern ¶
Event-driven pattern that separates event detection from data retrieval:
- TriggerInput: Emits lightweight events referencing data locations
- RetrievalProcessor: Fetches actual data based on trigger events
This pattern enables:
- Efficient filtering before expensive operations
- Event deduplication and ordering
- Graceful handling of data availability delays
- Better resource utilization
Self-Contained Pattern ¶
For streaming systems where events contain the actual data:
- SelfContainedInput: Events include all necessary data payload
Examples: NATS messages, MQTT payloads, Kafka records.
Index ¶
- Constants
- Variables
- func NoopCallback(ctx context.Context, err error) error
- type Batch
- type Collector
- type Component
- type ComponentConstructor
- type ComponentContext
- type ComponentFactory
- type ComponentSpec
- type Config
- type ConfigSchema
- type Counter
- type DynamicField
- type DynamicFieldFactory
- type Environment
- type ExprLangExpressionFactory
- type Expression
- type ExpressionContext
- type ExpressionFactory
- type Gauge
- type Input
- type Logger
- type Message
- type MessageFactory
- type Metadata
- type MetadataFilter
- type MetadataFilterFactory
- type Metrics
- type Output
- type ProcessedCallback
- type Processor
- type ResourceManager
- type RetrievalProcessor
- type SchemaField
- type SelfContainedInput
- type System
- type SystemConstructor
- type SystemFactory
- type Timer
- type TriggerBatch
- type TriggerEvent
- type TriggerInput
Constants ¶
const ( TriggerSourceEventBridge = "eventbridge" TriggerSourceS3Polling = "s3-polling" TriggerSourceGenerate = "generate" TriggerSourceSQS = "sqs" TriggerSourceFile = "file" )
Trigger Source Constants
const ( MetadataBucket = "bucket" MetadataKey = "key" MetadataEventName = "event_name" MetadataRegion = "region" MetadataTimestamp = "timestamp" MetadataSize = "size" MetadataETag = "etag" )
Common Metadata Keys
Variables ¶
var ErrAlreadyConnected = errors.New("already connected")
var ErrNoData = errors.New("no data available")
var ErrNotConnected = errors.New("not connected")
Functions ¶
Types ¶
type Collector ¶
type Collector interface {
Collect(msg Message) error
Flush() (Batch, error)
// Legacy methods for backward compatibility
Write(msg Message) error
Disconnect() error
}
Collector collects messages for batch processing. This maintains compatibility with existing input patterns.
func NewSimpleCollector ¶
func NewSimpleCollector() Collector
NewSimpleCollector creates a basic collector implementation.
type Component ¶
type Component interface {
// Init initializes the component with the given context.
// This is called once before the component starts processing.
Init(ctx ComponentContext) error
// Close releases any resources held by the component.
// This is called when the component is being shut down.
Close(ctx ComponentContext) error
}
Component is the base interface for all pipeline components. All components must be able to initialize and cleanup their resources.
type ComponentConstructor ¶
ComponentConstructor is a function type for creating components. This mirrors benthos patterns while maintaining system dependency injection.
type ComponentContext ¶
type ComponentContext interface {
Logger
ExpressionFactory
MessageFactory
MetadataFilterFactory
Context() context.Context
// Resource management
Resources() ResourceManager
// Component access (for cross-component communication)
Input(name string) (Input, error)
Output(name string) (Output, error)
// System access (for shared client usage)
System(name string) (System, error)
}
type ComponentFactory ¶
type ComponentFactory interface {
// NewInput creates a new Input component using the provided system and configuration
NewInput(sys System, cfg Config) (Input, error)
// NewOutput creates a new Output component using the provided system and configuration
NewOutput(sys System, cfg Config) (Output, error)
}
ComponentFactory provides methods to create component instances. This interface enables benthos-compatible component construction patterns while maintaining the System-first architecture.
type ComponentSpec ¶
type ComponentSpec interface {
// Name returns the component name used for registration
Name() string
// Summary returns a brief description of the component
Summary() string
// Description returns detailed documentation for the component
Description() string
// InputConfigSchema returns the JSON schema for input component configuration
InputConfigSchema() string
// OutputConfigSchema returns the JSON schema for output component configuration
OutputConfigSchema() string
// SystemConfigSchema returns the JSON schema for system configuration
SystemConfigSchema() string
}
ComponentSpec defines the schema and metadata for a component type. This enables benthos-compatible component registration and validation.
type ConfigSchema ¶
type ConfigSchema interface {
// AddField adds a configuration field to the schema
AddField(field SchemaField) ConfigSchema
// ToJSON returns the JSON schema representation
ToJSON() (string, error)
}
ConfigSchema provides a programmatic way to build component schemas. This is useful for components that need dynamic schema generation.
func NewConfigSchema ¶
func NewConfigSchema() ConfigSchema
NewConfigSchema creates a new configuration schema builder.
type Counter ¶
type Counter interface {
Inc(delta int64)
}
Counter represents a monotonically increasing counter metric.
type DynamicField ¶
type DynamicField interface {
String() string
Int() int
Bool() bool
// Legacy methods for backward compatibility
AsString(msg Message) (string, error)
AsBool(msg Message) (bool, error)
}
DynamicField represents a field that can be evaluated dynamically. This maintains compatibility with existing component patterns.
type DynamicFieldFactory ¶
type DynamicFieldFactory interface {
NewDynamicField(expr string) DynamicField
}
DynamicFieldFactory creates dynamic fields from expressions.
type Environment ¶
type Environment interface {
Logger
DynamicFieldFactory
GetString(key string) string
GetInt(key string) int
GetBool(key string) bool
}
Environment provides environment variable and dynamic field access. This interface maintains compatibility with existing components.
func NewSimpleEnvironment ¶
func NewSimpleEnvironment() Environment
NewSimpleEnvironment creates a basic environment implementation.
type ExprLangExpressionFactory ¶
type ExprLangExpressionFactory struct{}
ExprLangExpressionFactory implements ExpressionFactory using expr-lang
func (*ExprLangExpressionFactory) ParseExpression ¶
func (e *ExprLangExpressionFactory) ParseExpression(exprStr string) (Expression, error)
type Expression ¶
type Expression interface {
EvalString(ctx ExpressionContext) (string, error)
EvalInt(ctx ExpressionContext) (int, error)
EvalBool(ctx ExpressionContext) (bool, error)
}
type ExpressionContext ¶
func MessageExpressionContext ¶
func MessageExpressionContext(msg Message) ExpressionContext
type ExpressionFactory ¶
type ExpressionFactory interface {
ParseExpression(expr string) (Expression, error)
}
func NewExprLangExpressionFactory ¶
func NewExprLangExpressionFactory() ExpressionFactory
NewExprLangExpressionFactory creates a new expression factory using expr-lang
type Gauge ¶
type Gauge interface {
Set(value float64)
}
Gauge represents a gauge metric that can go up and down.
type Input ¶
type Input interface {
Component
// Read retrieves a batch of data from the external source.
// Returns the data batch and a callback to acknowledge processing.
Read(ctx ComponentContext) (Batch, ProcessedCallback, error)
}
Input represents a component that reads data from external sources. Examples include database readers, file parsers, and message queue consumers.
type Message ¶
type Message interface {
SetMetadata(key string, value any)
SetRaw(b []byte)
Raw() ([]byte, error)
Metadata() iter.Seq2[string, any]
}
func NewBytesMessage ¶
NewBytesMessage creates a simple message from bytes for testing
type MessageFactory ¶
type Metadata ¶
func NewMapMetadata ¶
type MetadataFilter ¶
type MetadataFilterFactory ¶
type MetadataFilterFactory interface {
BuildMetadataFilter(patterns []string, invert bool) (MetadataFilter, error)
}
type Metrics ¶
type Metrics interface {
Counter(name string) Counter
Gauge(name string) Gauge
Timer(name string) Timer
}
Metrics provides telemetry collection interface. This is a placeholder for future metrics implementation.
type Output ¶
type Output interface {
Component
// Write sends a batch of data to the external destination.
// The batch should be written atomically where possible.
Write(ctx ComponentContext, batch Batch) error
}
Output represents a component that writes data to external destinations. Examples include database writers, file exporters, and message queue producers.
type ProcessedCallback ¶
ProcessedCallback is a function signature for a callback to be called after a message or message batch has been processed. The provided error indicates whether the processing was successful. TODO: add extra information on what happens if an error is returned
type Processor ¶
type Processor interface {
Component
// Process transforms the input batch and returns the processed result.
// Returns the transformed batch and a callback to acknowledge processing.
Process(ctx ComponentContext, batch Batch) (Batch, ProcessedCallback, error)
}
Processor represents a component that transforms data between inputs and outputs. Examples include data filters, enrichers, parsers, and format converters.
type ResourceManager ¶
type ResourceManager interface {
Logger() Logger
// System returns a shared system instance by name
System(name string) (System, error)
// RegisterSystem registers a system instance for sharing
RegisterSystem(name string, sys System) error
// Context returns the base context for operations
Context() context.Context
// Metrics returns a metrics interface (placeholder for future implementation)
Metrics() Metrics
}
ResourceManager provides access to shared resources and services. This is equivalent to benthos's service.Resources but designed for the System-first architecture.
func NewResourceManager ¶
func NewResourceManager(ctx context.Context, logger Logger) ResourceManager
NewResourceManager creates a new resource manager instance.
type RetrievalProcessor ¶
type RetrievalProcessor interface {
Component
// Retrieve fetches actual data based on the provided trigger events.
// Can filter triggers before retrieval to optimize performance.
Retrieve(ctx ComponentContext, triggers TriggerBatch) (Batch, ProcessedCallback, error)
}
RetrievalProcessor fetches actual data based on trigger events. Enables filtering and validation before expensive retrieval operations.
This component works with TriggerInput to implement the trigger-retrieval pattern: 1. Receives lightweight trigger events 2. Filters and validates triggers before retrieval 3. Fetches the actual data only when needed 4. Returns the retrieved data as a standard batch
Examples: S3 object retrievers, database record fetchers, API data pullers.
type SchemaField ¶
type SchemaField struct {
Name string
Type string
Description string
Required bool
Default any
Examples []any
}
SchemaField represents a configuration field with validation rules.
type SelfContainedInput ¶
type SelfContainedInput interface {
Input // Inherits standard Input behavior
}
SelfContainedInput represents inputs where the trigger IS the data. Used for streaming systems where events contain the actual data payload.
Unlike the trigger-retrieval pattern, these inputs don't need separate data fetching because the event notification contains all necessary data.
Examples: NATS messages, MQTT payloads, Kafka records, Redis streams.
type SystemConstructor ¶
SystemConstructor is a function type for creating systems.
type SystemFactory ¶
type SystemFactory interface {
// NewSystem creates a new System instance from raw configuration
NewSystem(cfg Config) (System, error)
}
SystemFactory creates System instances from configuration.
type Timer ¶
type Timer interface {
Record(duration float64)
}
Timer represents a timer metric for measuring durations.
type TriggerBatch ¶
type TriggerBatch interface {
// Triggers returns all trigger events in this batch.
Triggers() []TriggerEvent
// Append adds a new trigger event to this batch.
Append(trigger TriggerEvent)
}
TriggerBatch contains a collection of lightweight trigger events. Used to group related triggers for efficient processing.
func NewTriggerBatch ¶
func NewTriggerBatch() TriggerBatch
NewTriggerBatch creates a new empty trigger batch
type TriggerEvent ¶
type TriggerEvent interface {
// Source identifies the trigger mechanism that generated this event.
// Examples: "eventbridge", "s3-polling", "webhook", "generate"
Source() string
// Reference provides the data location or identifier for retrieval.
// Examples: S3 object key, database record ID, API endpoint URL
Reference() string
// Metadata contains trigger-specific context and additional parameters.
// This can include filtering criteria, authentication tokens, or processing hints.
Metadata() map[string]any
// Timestamp returns when the trigger was generated (Unix timestamp).
// Used for ordering, deduplication, and expiration logic.
Timestamp() int64
}
TriggerEvent represents a lightweight reference to data that can be retrieved. This is the core abstraction of the trigger-retrieval pattern, providing just enough information to identify and locate data without fetching it.
func NewTriggerEvent ¶
func NewTriggerEvent(source, reference string, metadata map[string]any) TriggerEvent
NewTriggerEvent creates a new trigger event
type TriggerInput ¶
type TriggerInput interface {
Component
// ReadTriggers returns a batch of trigger events referencing available data.
// These are lightweight references, not the actual data.
ReadTriggers(ctx ComponentContext) (TriggerBatch, ProcessedCallback, error)
}
TriggerInput emits lightweight trigger events that reference data locations without fetching the actual data. Designed for event-driven architectures.
This pattern separates event detection from data retrieval, enabling: - Efficient filtering before expensive operations - Event deduplication and ordering - Graceful handling of data availability delays
Examples: EventBridge listeners, S3 notifications, webhook receivers.