Documentation
¶
Overview ¶
Package dataflow provides a lightweight data pipeline framework for ETL tasks. It implements a Source → Processor → Sink pattern using Go channels.
Index ¶
- Variables
- type BaseProcessor
- type Closer
- type ComponentInfo
- type ComponentSpec
- type ConcurrencyCap
- type ConcurrencyCapable
- type Flow
- func (f *Flow[T]) Build() error
- func (f *Flow[T]) Close() error
- func (f *Flow[T]) IsRunning() bool
- func (f *Flow[T]) Metrics() metrics.Summary
- func (f *Flow[T]) Run(ctx context.Context) error
- func (f *Flow[T]) SetPrometheusCollector(collector metrics.PrometheusCollector)
- func (f *Flow[T]) Stats() Stats
- type FlowConfig
- type MetricsRecorderAware
- type Processor
- type Registry
- func (r *Registry[T]) GetProcessor(name string) (Processor[T], bool)
- func (r *Registry[T]) GetSink(name string) (Sink[T], bool)
- func (r *Registry[T]) GetSource(name string) (Source[T], bool)
- func (r *Registry[T]) ListProcessors() []string
- func (r *Registry[T]) ListSinks() []string
- func (r *Registry[T]) ListSources() []string
- func (r *Registry[T]) RegisterProcessor(name string, builder func() Processor[T])
- func (r *Registry[T]) RegisterSink(name string, builder func() Sink[T])
- func (r *Registry[T]) RegisterSource(name string, builder func() Source[T])
- type Sink
- type Source
- type StatelessProcessor
- type Stats
- type StatsCounter
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrComponentNotFound is returned when a component is not registered. ErrComponentNotFound = errors.New("component not found") // ErrInvalidConfig is returned when configuration is invalid. ErrInvalidConfig = errors.New("invalid config") // ErrFlowNotBuilt is returned when Run() is called before Build(). ErrFlowNotBuilt = errors.New("flow not built, call Build() first") // ErrFlowAlreadyRunning is returned when Run() is called while flow is running. ErrFlowAlreadyRunning = errors.New("flow already running") // ErrSourceRequired is returned when source is not configured. ErrSourceRequired = errors.New("source is required") // ErrSinkRequired is returned when sink is not configured. ErrSinkRequired = errors.New("sink is required") // ErrFlowNameRequired is returned when flow name is empty. ErrFlowNameRequired = errors.New("flow name is required") )
Sentinel errors for common failure cases.
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor[T any] struct{}
BaseProcessor provides a default implementation of ConcurrencyCapable. It declares that the component does NOT support concurrency (safe by default).
func (*BaseProcessor[T]) ConcurrencyCap ¶
func (p *BaseProcessor[T]) ConcurrencyCap() ConcurrencyCap
ConcurrencyCap returns the default concurrency capability (not supported).
type Closer ¶
type Closer interface {
Close() error
}
Closer is an optional interface for components that need resource cleanup. If a component implements Closer, Flow will call Close() when the flow ends.
type ComponentInfo ¶
type ComponentInfo struct {
// FlowName is the name of the flow
FlowName string
// ComponentName is the component name (e.g., "source", "processor[0]", "sink")
ComponentName string
// ComponentType is the component type: "source", "processor", or "sink"
ComponentType string
}
ComponentInfo 包含传递给组件的上下文信息
type ComponentSpec ¶
type ComponentSpec struct {
// Name is the component name used for Registry lookup.
Name string `yaml:"name" json:"name"`
// Concurrency is the optional concurrency override for processors.
// Only applies to processors that declare concurrency support.
// - nil or not set: use the component's declared default
// - 1: force single goroutine (disable concurrency)
// - N > 1: use N workers (no upper limit, user decides)
// Recommended: set based on CPU cores and whether the component is I/O bound.
// Ignored for source and sink.
Concurrency *int `yaml:"concurrency" json:"concurrency"`
// Config is the component-specific configuration passed to Init.
Config map[string]interface{} `yaml:"config" json:"config"`
}
ComponentSpec defines a component specification.
type ConcurrencyCap ¶
type ConcurrencyCap struct {
// Supported indicates whether the component supports concurrent processing
Supported bool
// SuggestedMax is the suggested maximum concurrency (0 = no limit)
SuggestedMax int
// IsStateful indicates whether the component has internal state
IsStateful bool
}
ConcurrencyCap describes the concurrency capability of a component.
type ConcurrencyCapable ¶
type ConcurrencyCapable interface {
ConcurrencyCap() ConcurrencyCap
}
ConcurrencyCapable is an interface for components to declare their concurrency capability. The framework uses this to decide whether to run the component concurrently.
Design principle: Concurrency capability is decided by the component itself, not by the framework or configuration.
type Flow ¶
type Flow[T any] struct { // contains filtered or unexported fields }
Flow 是数据流管道编排器。 管理 Source → Processors → Sink 的执行。
Example ¶
Example demonstrates basic flow usage.
// Define data type
type Order struct {
ID string `json:"id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
}
// Create registry
registry := NewRegistry[Order]()
// Create and register source with data
source := &MockSource[Order]{
data: []Order{
{ID: "001", Amount: 100.0, Status: "completed"},
{ID: "002", Amount: 200.0, Status: "pending"},
{ID: "003", Amount: 150.0, Status: "completed"},
},
}
sink := &MockSink[Order]{}
registry.RegisterSource("orders", func() Source[Order] {
return source
})
registry.RegisterSink("console", func() Sink[Order] {
return sink
})
// Create flow config
config := &FlowConfig{
Name: "order_processing",
BufferSize: 100,
Source: ComponentSpec{Name: "orders"},
Sink: ComponentSpec{Name: "console"},
}
// Build and run
flow := NewFlow[Order](config, registry)
if err := flow.Build(); err != nil {
fmt.Printf("Build failed: %v\n", err)
return
}
if err := flow.Run(context.Background()); err != nil {
fmt.Printf("Run failed: %v\n", err)
return
}
stats := flow.Stats()
fmt.Printf("Processed %d records\n", stats.TotalIn)
func NewFlow ¶
func NewFlow[T any](config *FlowConfig, registry *Registry[T]) *Flow[T]
NewFlow 创建一个新的 Flow 实例。
func (*Flow[T]) Metrics ¶
Metrics 返回指标汇总。 TotalIn = source 的 output(Read() 返回值) TotalOut = 管道最后一个有输出的组件的 output
- 有 processor 时:最后一个 processor 的 RecordOut
- 无 processor 时:source 的 RecordOut
TotalError = StatsCounter 中所有组件错误之和 Components 为各组件独立明细。
func (*Flow[T]) SetPrometheusCollector ¶
func (f *Flow[T]) SetPrometheusCollector(collector metrics.PrometheusCollector)
SetPrometheusCollector 设置 Prometheus 指标收集器。 设置后,Flow 运行时会实时将指标上报到 Prometheus。
type FlowConfig ¶
type FlowConfig struct {
// Name is the flow name used for logging and monitoring.
Name string `yaml:"name" json:"name"`
// BufferSize is the channel buffer size between components.
BufferSize int `yaml:"buffer_size" json:"buffer_size"`
// Source is the source component configuration.
Source ComponentSpec `yaml:"source" json:"source"`
// Processors is the list of processor configurations (executed in order).
Processors []ComponentSpec `yaml:"processors" json:"processors"`
// Sink is the sink component configuration.
Sink ComponentSpec `yaml:"sink" json:"sink"`
}
FlowConfig defines the configuration for a Flow.
func (*FlowConfig) SetDefaults ¶
func (c *FlowConfig) SetDefaults()
SetDefaults sets default values for the configuration.
func (*FlowConfig) Validate ¶
func (c *FlowConfig) Validate() error
Validate validates the flow configuration.
func (*FlowConfig) ValidateBuild ¶
func (c *FlowConfig) ValidateBuild() error
ValidateBuild validates the configuration during Build. This is called internally by Flow.Build().
type MetricsRecorderAware ¶
type MetricsRecorderAware interface {
SetMetricsRecorder(recorder metrics.Recorder, info ComponentInfo)
}
MetricsRecorderAware is an optional interface for components that need to record custom metrics. If a component implements this interface, Flow will inject the metrics recorder after Init.
type Processor ¶
type Processor[T any] interface { // Init initializes the component with JSON configuration. // Returns an error if config parsing or validation fails. Init(config []byte) error // Process reads data from input channel, processes it, and sends to output channel. // ctx: context for cancellation and timeout // in: input channel to read from (upstream writes to this) // out: output channel to write to (downstream reads from this) // Returns: error if any // // Implementation guidelines: // - Use for data := range in { ... } to iterate input channel // - Can filter data (not send to out) // - Can transform 1 record to N records (1:N) // - Can merge N records to 1 record (N:1) // - DO NOT close out channel (managed by framework) // - Return when in channel is closed and all data is processed Process(ctx context.Context, in <-chan T, out chan<- T) error }
Processor is the data processor interface. Processors read data from input channel, process it, and send to output channel.
type Registry ¶
type Registry[T any] struct { // contains filtered or unexported fields }
Registry manages component registrations.
func NewRegistry ¶
NewRegistry creates a new component registry.
func (*Registry[T]) GetProcessor ¶
GetProcessor returns a new processor instance by name.
func (*Registry[T]) ListProcessors ¶
ListProcessors returns all registered processor names.
func (*Registry[T]) ListSources ¶
ListSources returns all registered source names.
func (*Registry[T]) RegisterProcessor ¶
RegisterProcessor registers a processor factory function.
func (*Registry[T]) RegisterSink ¶
RegisterSink registers a sink factory function.
func (*Registry[T]) RegisterSource ¶
RegisterSource registers a source factory function.
type Sink ¶
type Sink[T any] interface { // Init initializes the component with JSON configuration. // Returns an error if config parsing or validation fails. Init(config []byte) error // Consume reads data from input channel and writes to external system. // ctx: context for cancellation and timeout // in: input channel to read from // Returns: error if any // // Implementation guidelines: // - Use for data := range in { ... } to iterate input channel // - Implement batch writing for better performance // - Implement periodic flush to avoid data accumulation // - Return when in channel is closed and all data is written // - Handle write failures (retry or log error) Consume(ctx context.Context, in <-chan T) error }
Sink is the data sink interface. Sinks consume data from input channel and write to external systems.
type Source ¶
type Source[T any] interface { // Init initializes the component with JSON configuration. // This method only parses config, does NOT establish connections. // Returns an error if config parsing or validation fails. Init(config []byte) error // Read reads data from external system and sends to the output channel. // ctx: context for cancellation and timeout // out: output channel to send data // Returns: number of records sent, error if any // // Implementation guidelines: // - Establish connections in this method, not in Init // - Close resources before returning // - Check ctx.Done() regularly to support cancellation // - Return the count of successfully sent records Read(ctx context.Context, out chan<- T) (int64, error) }
Source is the data source interface. Sources read data from external systems and send to the output channel.
type StatelessProcessor ¶
type StatelessProcessor[T any] struct{}
StatelessProcessor provides a default implementation for stateless processors. It declares that the component supports concurrency.
func (*StatelessProcessor[T]) ConcurrencyCap ¶
func (p *StatelessProcessor[T]) ConcurrencyCap() ConcurrencyCap
ConcurrencyCap returns concurrency capability (supported, no limit).
type Stats ¶
type Stats struct {
// TotalIn is the total number of records read from source.
TotalIn int64
// TotalOut is the total number of records written to sink.
TotalOut int64
// TotalError is the total number of errors encountered.
TotalError int64
// Duration is the total run duration in milliseconds.
Duration int64
}
Stats holds the runtime statistics for a Flow.
type StatsCounter ¶
type StatsCounter struct {
// contains filtered or unexported fields
}
StatsCounter provides thread-safe statistics counting.
func (*StatsCounter) AddError ¶
func (s *StatsCounter) AddError(n int64)
AddError increments the error counter by n.
func (*StatsCounter) AddIn ¶
func (s *StatsCounter) AddIn(n int64)
AddIn increments the input counter by n.
func (*StatsCounter) AddOut ¶
func (s *StatsCounter) AddOut(n int64)
AddOut increments the output counter by n.
func (*StatsCounter) Get ¶
func (s *StatsCounter) Get() Stats
Get returns a snapshot of current statistics.
func (*StatsCounter) SetDuration ¶
func (s *StatsCounter) SetDuration(ms int64)
SetDuration sets the run duration in milliseconds.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package app 提供应用程序框架。
|
Package app 提供应用程序框架。 |
|
builtins
|
|
|
processor/aggregate
Package aggregate 提供分组聚合处理器
|
Package aggregate 提供分组聚合处理器 |
|
processor/condition
Package condition 提供条件过滤处理器
|
Package condition 提供条件过滤处理器 |
|
processor/expr
Package expr 提供表达式过滤处理器
|
Package expr 提供表达式过滤处理器 |
|
processor/jqtransform
Package jqtransform 提供基于 jq 语法的超级转换处理器
|
Package jqtransform 提供基于 jq 语法的超级转换处理器 |
|
processor/transform
Package transform 提供字段转换处理器
|
Package transform 提供字段转换处理器 |
|
python/processor
Package processor 提供 Python 脚本 Processor 实现。
|
Package processor 提供 Python 脚本 Processor 实现。 |
|
python/runner
Package runner 提供 Python 子进程运行器。
|
Package runner 提供 Python 子进程运行器。 |
|
python/sink
Package sink 提供 Python 脚本 Sink 实现。
|
Package sink 提供 Python 脚本 Sink 实现。 |
|
python/source
Package source 提供 Python 脚本 Source 实现。
|
Package source 提供 Python 脚本 Source 实现。 |
|
sink/clickhouse
Package clickhouse 提供 ClickHouse 批量写入 Sink 支持批量写入、定时刷新和可选的字段映射
|
Package clickhouse 提供 ClickHouse 批量写入 Sink 支持批量写入、定时刷新和可选的字段映射 |
|
sink/collect
Package collect 提供内存收集的 Sink,主要用于测试
|
Package collect 提供内存收集的 Sink,主要用于测试 |
|
sink/csv
Package csv 提供写入 CSV 文件的 Sink
|
Package csv 提供写入 CSV 文件的 Sink |
|
sink/json
Package json 提供写入 JSON 文件的 Sink
|
Package json 提供写入 JSON 文件的 Sink |
|
sink/null
Package null 提供丢弃数据的 Sink,用于性能测试
|
Package null 提供丢弃数据的 Sink,用于性能测试 |
|
sink/output
Package output 提供控制台输出的 Sink
|
Package output 提供控制台输出的 Sink |
|
source/csv
Package csv 提供从 CSV 文件读取数据的 Source
|
Package csv 提供从 CSV 文件读取数据的 Source |
|
source/generator
Package generator 提供序列数据生成器
|
Package generator 提供序列数据生成器 |
|
source/json
Package json 提供从 JSON 文件读取数据的 Source
|
Package json 提供从 JSON 文件读取数据的 Source |
|
source/kafka
Package kafka 提供从 Kafka Topic 消费消息的 Source
|
Package kafka 提供从 Kafka Topic 消费消息的 Source |
|
source/static
Package static 提供静态内存数据源
|
Package static 提供静态内存数据源 |
|
types
Package types 提供内置组件的共享类型定义
|
Package types 提供内置组件的共享类型定义 |
|
Package metrics 提供 Prometheus 指标收集和暴露功能
|
Package metrics 提供 Prometheus 指标收集和暴露功能 |