spec

package
v0.1.0-rc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 12 Imported by: 2

Documentation

Overview

Package spec defines the core interfaces and abstractions for WombatWisdom components.

WombatWisdom implements a component-based data processing framework with two main patterns:

Standard Pattern

Traditional data processing components:

  • Input: Reads data from external sources
  • Processor: Transforms data between inputs and outputs
  • Output: Writes data to external destinations

Trigger-Retrieval Pattern

Event-driven pattern that separates event detection from data retrieval:

  • TriggerInput: Emits lightweight events referencing data locations
  • RetrievalProcessor: Fetches actual data based on trigger events

This pattern enables:

  • Efficient filtering before expensive operations
  • Event deduplication and ordering
  • Graceful handling of data availability delays
  • Better resource utilization

Self-Contained Pattern

For streaming systems where events contain the actual data:

  • SelfContainedInput: Events include all necessary data payload

Examples: NATS messages, MQTT payloads, Kafka records.

Index

Constants

View Source
const (
	TriggerSourceEventBridge = "eventbridge"
	TriggerSourceS3Polling   = "s3-polling"
	TriggerSourceGenerate    = "generate"
	TriggerSourceSQS         = "sqs"
	TriggerSourceFile        = "file"
)

Trigger Source Constants

View Source
const (
	MetadataBucket    = "bucket"
	MetadataKey       = "key"
	MetadataEventName = "event_name"
	MetadataRegion    = "region"
	MetadataTimestamp = "timestamp"
	MetadataSize      = "size"
	MetadataETag      = "etag"
)

Common Metadata Keys

Variables

View Source
var ErrAlreadyConnected = errors.New("already connected")
View Source
var ErrNoData = errors.New("no data available")
View Source
var ErrNotConnected = errors.New("not connected")

Functions

func NoopCallback

func NoopCallback(ctx context.Context, err error) error

Types

type Batch

type Batch interface {
	Messages() iter.Seq2[int, Message]
	Append(msg Message)
}

type Collector

type Collector interface {
	Collect(msg Message) error
	Flush() (Batch, error)
	// Legacy methods for backward compatibility
	Write(msg Message) error
	Disconnect() error
}

Collector collects messages for batch processing. This maintains compatibility with existing input patterns.

func NewSimpleCollector

func NewSimpleCollector() Collector

NewSimpleCollector creates a basic collector implementation.

type Component

type Component interface {
	// Init initializes the component with the given context.
	// This is called once before the component starts processing.
	Init(ctx ComponentContext) error

	// Close releases any resources held by the component.
	// This is called when the component is being shut down.
	Close(ctx ComponentContext) error
}

Component is the base interface for all pipeline components. All components must be able to initialize and cleanup their resources.

type ComponentConstructor

type ComponentConstructor[T Component] func(sys System, cfg Config) (T, error)

ComponentConstructor is a function type for creating components. This mirrors benthos patterns while maintaining system dependency injection.

type ComponentContext

type ComponentContext interface {
	Logger
	ExpressionFactory
	MessageFactory
	MetadataFilterFactory

	Context() context.Context

	// Resource management
	Resources() ResourceManager

	// Component access (for cross-component communication)
	Input(name string) (Input, error)
	Output(name string) (Output, error)

	// System access (for shared client usage)
	System(name string) (System, error)
}

type ComponentFactory

type ComponentFactory interface {
	// NewInput creates a new Input component using the provided system and configuration
	NewInput(sys System, cfg Config) (Input, error)

	// NewOutput creates a new Output component using the provided system and configuration
	NewOutput(sys System, cfg Config) (Output, error)
}

ComponentFactory provides methods to create component instances. This interface enables benthos-compatible component construction patterns while maintaining the System-first architecture.

type ComponentSpec

type ComponentSpec interface {
	// Name returns the component name used for registration
	Name() string

	// Summary returns a brief description of the component
	Summary() string

	// Description returns detailed documentation for the component
	Description() string

	// InputConfigSchema returns the JSON schema for input component configuration
	InputConfigSchema() string

	// OutputConfigSchema returns the JSON schema for output component configuration
	OutputConfigSchema() string

	// SystemConfigSchema returns the JSON schema for system configuration
	SystemConfigSchema() string
}

ComponentSpec defines the schema and metadata for a component type. This enables benthos-compatible component registration and validation.

type Config

type Config interface {
	Decode(target any) error
}

func NewMapConfig

func NewMapConfig(raw map[string]any) Config

func NewYamlConfig

func NewYamlConfig(raw string, replacements ...string) Config

type ConfigSchema

type ConfigSchema interface {
	// AddField adds a configuration field to the schema
	AddField(field SchemaField) ConfigSchema

	// ToJSON returns the JSON schema representation
	ToJSON() (string, error)
}

ConfigSchema provides a programmatic way to build component schemas. This is useful for components that need dynamic schema generation.

func NewConfigSchema

func NewConfigSchema() ConfigSchema

NewConfigSchema creates a new configuration schema builder.

type Counter

type Counter interface {
	Inc(delta int64)
}

Counter represents a monotonically increasing counter metric.

type DynamicField

type DynamicField interface {
	String() string
	Int() int
	Bool() bool
	// Legacy methods for backward compatibility
	AsString(msg Message) (string, error)
	AsBool(msg Message) (bool, error)
}

DynamicField represents a field that can be evaluated dynamically. This maintains compatibility with existing component patterns.

type DynamicFieldFactory

type DynamicFieldFactory interface {
	NewDynamicField(expr string) DynamicField
}

DynamicFieldFactory creates dynamic fields from expressions.

type Environment

type Environment interface {
	Logger
	DynamicFieldFactory
	GetString(key string) string
	GetInt(key string) int
	GetBool(key string) bool
}

Environment provides environment variable and dynamic field access. This interface maintains compatibility with existing components.

func NewSimpleEnvironment

func NewSimpleEnvironment() Environment

NewSimpleEnvironment creates a basic environment implementation.

type ExprLangExpressionFactory

type ExprLangExpressionFactory struct{}

ExprLangExpressionFactory implements ExpressionFactory using expr-lang

func (*ExprLangExpressionFactory) ParseExpression

func (e *ExprLangExpressionFactory) ParseExpression(exprStr string) (Expression, error)

type Expression

type Expression interface {
	EvalString(ctx ExpressionContext) (string, error)
	EvalInt(ctx ExpressionContext) (int, error)
	EvalBool(ctx ExpressionContext) (bool, error)
}

type ExpressionContext

type ExpressionContext map[string]any

func MessageExpressionContext

func MessageExpressionContext(msg Message) ExpressionContext

type ExpressionFactory

type ExpressionFactory interface {
	ParseExpression(expr string) (Expression, error)
}

func NewExprLangExpressionFactory

func NewExprLangExpressionFactory() ExpressionFactory

NewExprLangExpressionFactory creates a new expression factory using expr-lang

type Gauge

type Gauge interface {
	Set(value float64)
}

Gauge represents a gauge metric that can go up and down.

type Input

type Input interface {
	Component

	// Read retrieves a batch of data from the external source.
	// Returns the data batch and a callback to acknowledge processing.
	Read(ctx ComponentContext) (Batch, ProcessedCallback, error)
}

Input represents a component that reads data from external sources. Examples include database readers, file parsers, and message queue consumers.

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

type Message

type Message interface {
	SetMetadata(key string, value any)
	SetRaw(b []byte)

	Raw() ([]byte, error)
	Metadata() iter.Seq2[string, any]
}

func NewBytesMessage

func NewBytesMessage(data []byte) Message

NewBytesMessage creates a simple message from bytes for testing

type MessageFactory

type MessageFactory interface {
	NewBatch() Batch
	NewMessage() Message
}

type Metadata

type Metadata interface {
	Keys() iter.Seq[string]
	Get(key string) any
}

func NewMapMetadata

func NewMapMetadata(data map[string]any) Metadata

type MetadataFilter

type MetadataFilter interface {
	Include(key string) bool
}

type MetadataFilterFactory

type MetadataFilterFactory interface {
	BuildMetadataFilter(patterns []string, invert bool) (MetadataFilter, error)
}

type Metrics

type Metrics interface {
	Counter(name string) Counter
	Gauge(name string) Gauge
	Timer(name string) Timer
}

Metrics provides telemetry collection interface. This is a placeholder for future metrics implementation.

type Output

type Output interface {
	Component

	// Write sends a batch of data to the external destination.
	// The batch should be written atomically where possible.
	Write(ctx ComponentContext, batch Batch) error
}

Output represents a component that writes data to external destinations. Examples include database writers, file exporters, and message queue producers.

type ProcessedCallback

type ProcessedCallback func(ctx context.Context, err error) error

ProcessedCallback is a function signature for a callback to be called after a message or message batch has been processed. The provided error indicates whether the processing was successful. TODO: add extra information on what happens if an error is returned

type Processor

type Processor interface {
	Component

	// Process transforms the input batch and returns the processed result.
	// Returns the transformed batch and a callback to acknowledge processing.
	Process(ctx ComponentContext, batch Batch) (Batch, ProcessedCallback, error)
}

Processor represents a component that transforms data between inputs and outputs. Examples include data filters, enrichers, parsers, and format converters.

type ResourceManager

type ResourceManager interface {
	Logger() Logger

	// System returns a shared system instance by name
	System(name string) (System, error)

	// RegisterSystem registers a system instance for sharing
	RegisterSystem(name string, sys System) error

	// Context returns the base context for operations
	Context() context.Context

	// Metrics returns a metrics interface (placeholder for future implementation)
	Metrics() Metrics
}

ResourceManager provides access to shared resources and services. This is equivalent to benthos's service.Resources but designed for the System-first architecture.

func NewResourceManager

func NewResourceManager(ctx context.Context, logger Logger) ResourceManager

NewResourceManager creates a new resource manager instance.

type RetrievalProcessor

type RetrievalProcessor interface {
	Component

	// Retrieve fetches actual data based on the provided trigger events.
	// Can filter triggers before retrieval to optimize performance.
	Retrieve(ctx ComponentContext, triggers TriggerBatch) (Batch, ProcessedCallback, error)
}

RetrievalProcessor fetches actual data based on trigger events. Enables filtering and validation before expensive retrieval operations.

This component works with TriggerInput to implement the trigger-retrieval pattern: 1. Receives lightweight trigger events 2. Filters and validates triggers before retrieval 3. Fetches the actual data only when needed 4. Returns the retrieved data as a standard batch

Examples: S3 object retrievers, database record fetchers, API data pullers.

type SchemaField

type SchemaField struct {
	Name        string
	Type        string
	Description string
	Required    bool
	Default     any
	Examples    []any
}

SchemaField represents a configuration field with validation rules.

type SelfContainedInput

type SelfContainedInput interface {
	Input // Inherits standard Input behavior
}

SelfContainedInput represents inputs where the trigger IS the data. Used for streaming systems where events contain the actual data payload.

Unlike the trigger-retrieval pattern, these inputs don't need separate data fetching because the event notification contains all necessary data.

Examples: NATS messages, MQTT payloads, Kafka records, Redis streams.

type System

type System interface {
	Connect(ctx context.Context) error
	Close(ctx context.Context) error
	Client() any
}

type SystemConstructor

type SystemConstructor func(cfg Config) (System, error)

SystemConstructor is a function type for creating systems.

type SystemFactory

type SystemFactory interface {
	// NewSystem creates a new System instance from raw configuration
	NewSystem(cfg Config) (System, error)
}

SystemFactory creates System instances from configuration.

type Timer

type Timer interface {
	Record(duration float64)
}

Timer represents a timer metric for measuring durations.

type TriggerBatch

type TriggerBatch interface {
	// Triggers returns all trigger events in this batch.
	Triggers() []TriggerEvent

	// Append adds a new trigger event to this batch.
	Append(trigger TriggerEvent)
}

TriggerBatch contains a collection of lightweight trigger events. Used to group related triggers for efficient processing.

func NewTriggerBatch

func NewTriggerBatch() TriggerBatch

NewTriggerBatch creates a new empty trigger batch

type TriggerEvent

type TriggerEvent interface {
	// Source identifies the trigger mechanism that generated this event.
	// Examples: "eventbridge", "s3-polling", "webhook", "generate"
	Source() string

	// Reference provides the data location or identifier for retrieval.
	// Examples: S3 object key, database record ID, API endpoint URL
	Reference() string

	// Metadata contains trigger-specific context and additional parameters.
	// This can include filtering criteria, authentication tokens, or processing hints.
	Metadata() map[string]any

	// Timestamp returns when the trigger was generated (Unix timestamp).
	// Used for ordering, deduplication, and expiration logic.
	Timestamp() int64
}

TriggerEvent represents a lightweight reference to data that can be retrieved. This is the core abstraction of the trigger-retrieval pattern, providing just enough information to identify and locate data without fetching it.

func NewTriggerEvent

func NewTriggerEvent(source, reference string, metadata map[string]any) TriggerEvent

NewTriggerEvent creates a new trigger event

type TriggerInput

type TriggerInput interface {
	Component

	// ReadTriggers returns a batch of trigger events referencing available data.
	// These are lightweight references, not the actual data.
	ReadTriggers(ctx ComponentContext) (TriggerBatch, ProcessedCallback, error)
}

TriggerInput emits lightweight trigger events that reference data locations without fetching the actual data. Designed for event-driven architectures.

This pattern separates event detection from data retrieval, enabling: - Efficient filtering before expensive operations - Event deduplication and ordering - Graceful handling of data availability delays

Examples: EventBridge listeners, S3 notifications, webhook receivers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL