dataflow

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package dataflow provides a lightweight data pipeline framework for ETL tasks. It implements a Source → Processor → Sink pattern using Go channels.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrComponentNotFound is returned when a component is not registered.
	ErrComponentNotFound = errors.New("component not found")

	// ErrInvalidConfig is returned when configuration is invalid.
	ErrInvalidConfig = errors.New("invalid config")

	// ErrFlowNotBuilt is returned when Run() is called before Build().
	ErrFlowNotBuilt = errors.New("flow not built, call Build() first")

	// ErrFlowAlreadyRunning is returned when Run() is called while flow is running.
	ErrFlowAlreadyRunning = errors.New("flow already running")

	// ErrSourceRequired is returned when source is not configured.
	ErrSourceRequired = errors.New("source is required")

	// ErrSinkRequired is returned when sink is not configured.
	ErrSinkRequired = errors.New("sink is required")

	// ErrFlowNameRequired is returned when flow name is empty.
	ErrFlowNameRequired = errors.New("flow name is required")
)

Sentinel errors for common failure cases.

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor[T any] struct{}

BaseProcessor provides a default implementation of ConcurrencyCapable. It declares that the component does NOT support concurrency (safe by default).

func (*BaseProcessor[T]) ConcurrencyCap

func (p *BaseProcessor[T]) ConcurrencyCap() ConcurrencyCap

ConcurrencyCap returns the default concurrency capability (not supported).

type Closer

type Closer interface {
	Close() error
}

Closer is an optional interface for components that need resource cleanup. If a component implements Closer, Flow will call Close() when the flow ends.

type ComponentInfo

type ComponentInfo struct {
	// FlowName is the name of the flow
	FlowName string
	// ComponentName is the component name (e.g., "source", "processor[0]", "sink")
	ComponentName string
	// ComponentType is the component type: "source", "processor", or "sink"
	ComponentType string
}

ComponentInfo 包含传递给组件的上下文信息

type ComponentSpec

type ComponentSpec struct {
	// Name is the component name used for Registry lookup.
	Name string `yaml:"name" json:"name"`
	// Concurrency is the optional concurrency override for processors.
	// Only applies to processors that declare concurrency support.
	//   - nil or not set: use the component's declared default
	//   - 1: force single goroutine (disable concurrency)
	//   - N > 1: use N workers (no upper limit, user decides)
	// Recommended: set based on CPU cores and whether the component is I/O bound.
	// Ignored for source and sink.
	Concurrency *int `yaml:"concurrency" json:"concurrency"`
	// Config is the component-specific configuration passed to Init.
	Config map[string]interface{} `yaml:"config" json:"config"`
}

ComponentSpec defines a component specification.

type ConcurrencyCap

type ConcurrencyCap struct {
	// Supported indicates whether the component supports concurrent processing
	Supported bool
	// SuggestedMax is the suggested maximum concurrency (0 = no limit)
	SuggestedMax int
	// IsStateful indicates whether the component has internal state
	IsStateful bool
}

ConcurrencyCap describes the concurrency capability of a component.

type ConcurrencyCapable

type ConcurrencyCapable interface {
	ConcurrencyCap() ConcurrencyCap
}

ConcurrencyCapable is an interface for components to declare their concurrency capability. The framework uses this to decide whether to run the component concurrently.

Design principle: Concurrency capability is decided by the component itself, not by the framework or configuration.

type Flow

type Flow[T any] struct {
	// contains filtered or unexported fields
}

Flow 是数据流管道编排器。 管理 Source → Processors → Sink 的执行。

Example

Example demonstrates basic flow usage.

// Define data type
type Order struct {
	ID     string  `json:"id"`
	Amount float64 `json:"amount"`
	Status string  `json:"status"`
}

// Create registry
registry := NewRegistry[Order]()

// Create and register source with data
source := &MockSource[Order]{
	data: []Order{
		{ID: "001", Amount: 100.0, Status: "completed"},
		{ID: "002", Amount: 200.0, Status: "pending"},
		{ID: "003", Amount: 150.0, Status: "completed"},
	},
}
sink := &MockSink[Order]{}

registry.RegisterSource("orders", func() Source[Order] {
	return source
})
registry.RegisterSink("console", func() Sink[Order] {
	return sink
})

// Create flow config
config := &FlowConfig{
	Name:       "order_processing",
	BufferSize: 100,
	Source:     ComponentSpec{Name: "orders"},
	Sink:       ComponentSpec{Name: "console"},
}

// Build and run
flow := NewFlow[Order](config, registry)
if err := flow.Build(); err != nil {
	fmt.Printf("Build failed: %v\n", err)
	return
}

if err := flow.Run(context.Background()); err != nil {
	fmt.Printf("Run failed: %v\n", err)
	return
}

stats := flow.Stats()
fmt.Printf("Processed %d records\n", stats.TotalIn)

func NewFlow

func NewFlow[T any](config *FlowConfig, registry *Registry[T]) *Flow[T]

NewFlow 创建一个新的 Flow 实例。

func (*Flow[T]) Build

func (f *Flow[T]) Build() error

Build 初始化所有组件。 从注册表创建组件实例并调用每个组件的 Init 方法。 如果找不到任何组件或初始化失败则返回错误。

func (*Flow[T]) Close

func (f *Flow[T]) Close() error

Close 释放组件持有的资源。 实现了 Closer 接口的组件会被调用 Close() 方法。

func (*Flow[T]) IsRunning

func (f *Flow[T]) IsRunning() bool

IsRunning 返回 flow 是否正在运行。

func (*Flow[T]) Metrics

func (f *Flow[T]) Metrics() metrics.Summary

Metrics 返回指标汇总。 TotalIn = source 的 output(Read() 返回值) TotalOut = 管道最后一个有输出的组件的 output

  • 有 processor 时:最后一个 processor 的 RecordOut
  • 无 processor 时:source 的 RecordOut

TotalError = StatsCounter 中所有组件错误之和 Components 为各组件独立明细。

func (*Flow[T]) Run

func (f *Flow[T]) Run(ctx context.Context) error

Run 执行数据管道。 如果 flow 未构建、已在运行或任何组件失败则返回错误。 阻塞直到所有数据处理完成或 context 被取消。

func (*Flow[T]) SetPrometheusCollector

func (f *Flow[T]) SetPrometheusCollector(collector metrics.PrometheusCollector)

SetPrometheusCollector 设置 Prometheus 指标收集器。 设置后,Flow 运行时会实时将指标上报到 Prometheus。

func (*Flow[T]) Stats

func (f *Flow[T]) Stats() Stats

Stats 返回当前统计信息。

type FlowConfig

type FlowConfig struct {
	// Name is the flow name used for logging and monitoring.
	Name string `yaml:"name" json:"name"`
	// BufferSize is the channel buffer size between components.
	BufferSize int `yaml:"buffer_size" json:"buffer_size"`
	// Source is the source component configuration.
	Source ComponentSpec `yaml:"source" json:"source"`
	// Processors is the list of processor configurations (executed in order).
	Processors []ComponentSpec `yaml:"processors" json:"processors"`
	// Sink is the sink component configuration.
	Sink ComponentSpec `yaml:"sink" json:"sink"`
}

FlowConfig defines the configuration for a Flow.

func (*FlowConfig) SetDefaults

func (c *FlowConfig) SetDefaults()

SetDefaults sets default values for the configuration.

func (*FlowConfig) Validate

func (c *FlowConfig) Validate() error

Validate validates the flow configuration.

func (*FlowConfig) ValidateBuild

func (c *FlowConfig) ValidateBuild() error

ValidateBuild validates the configuration during Build. This is called internally by Flow.Build().

type MetricsRecorderAware

type MetricsRecorderAware interface {
	SetMetricsRecorder(recorder metrics.Recorder, info ComponentInfo)
}

MetricsRecorderAware is an optional interface for components that need to record custom metrics. If a component implements this interface, Flow will inject the metrics recorder after Init.

type Processor

type Processor[T any] interface {
	// Init initializes the component with JSON configuration.
	// Returns an error if config parsing or validation fails.
	Init(config []byte) error

	// Process reads data from input channel, processes it, and sends to output channel.
	// ctx: context for cancellation and timeout
	// in: input channel to read from (upstream writes to this)
	// out: output channel to write to (downstream reads from this)
	// Returns: error if any
	//
	// Implementation guidelines:
	// - Use for data := range in { ... } to iterate input channel
	// - Can filter data (not send to out)
	// - Can transform 1 record to N records (1:N)
	// - Can merge N records to 1 record (N:1)
	// - DO NOT close out channel (managed by framework)
	// - Return when in channel is closed and all data is processed
	Process(ctx context.Context, in <-chan T, out chan<- T) error
}

Processor is the data processor interface. Processors read data from input channel, process it, and send to output channel.

type Registry

type Registry[T any] struct {
	// contains filtered or unexported fields
}

Registry manages component registrations.

func NewRegistry

func NewRegistry[T any]() *Registry[T]

NewRegistry creates a new component registry.

func (*Registry[T]) GetProcessor

func (r *Registry[T]) GetProcessor(name string) (Processor[T], bool)

GetProcessor returns a new processor instance by name.

func (*Registry[T]) GetSink

func (r *Registry[T]) GetSink(name string) (Sink[T], bool)

GetSink returns a new sink instance by name.

func (*Registry[T]) GetSource

func (r *Registry[T]) GetSource(name string) (Source[T], bool)

GetSource returns a new source instance by name.

func (*Registry[T]) ListProcessors

func (r *Registry[T]) ListProcessors() []string

ListProcessors returns all registered processor names.

func (*Registry[T]) ListSinks

func (r *Registry[T]) ListSinks() []string

ListSinks returns all registered sink names.

func (*Registry[T]) ListSources

func (r *Registry[T]) ListSources() []string

ListSources returns all registered source names.

func (*Registry[T]) RegisterProcessor

func (r *Registry[T]) RegisterProcessor(name string, builder func() Processor[T])

RegisterProcessor registers a processor factory function.

func (*Registry[T]) RegisterSink

func (r *Registry[T]) RegisterSink(name string, builder func() Sink[T])

RegisterSink registers a sink factory function.

func (*Registry[T]) RegisterSource

func (r *Registry[T]) RegisterSource(name string, builder func() Source[T])

RegisterSource registers a source factory function.

type Sink

type Sink[T any] interface {
	// Init initializes the component with JSON configuration.
	// Returns an error if config parsing or validation fails.
	Init(config []byte) error

	// Consume reads data from input channel and writes to external system.
	// ctx: context for cancellation and timeout
	// in: input channel to read from
	// Returns: error if any
	//
	// Implementation guidelines:
	// - Use for data := range in { ... } to iterate input channel
	// - Implement batch writing for better performance
	// - Implement periodic flush to avoid data accumulation
	// - Return when in channel is closed and all data is written
	// - Handle write failures (retry or log error)
	Consume(ctx context.Context, in <-chan T) error
}

Sink is the data sink interface. Sinks consume data from input channel and write to external systems.

type Source

type Source[T any] interface {
	// Init initializes the component with JSON configuration.
	// This method only parses config, does NOT establish connections.
	// Returns an error if config parsing or validation fails.
	Init(config []byte) error

	// Read reads data from external system and sends to the output channel.
	// ctx: context for cancellation and timeout
	// out: output channel to send data
	// Returns: number of records sent, error if any
	//
	// Implementation guidelines:
	// - Establish connections in this method, not in Init
	// - Close resources before returning
	// - Check ctx.Done() regularly to support cancellation
	// - Return the count of successfully sent records
	Read(ctx context.Context, out chan<- T) (int64, error)
}

Source is the data source interface. Sources read data from external systems and send to the output channel.

type StatelessProcessor

type StatelessProcessor[T any] struct{}

StatelessProcessor provides a default implementation for stateless processors. It declares that the component supports concurrency.

func (*StatelessProcessor[T]) ConcurrencyCap

func (p *StatelessProcessor[T]) ConcurrencyCap() ConcurrencyCap

ConcurrencyCap returns concurrency capability (supported, no limit).

type Stats

type Stats struct {
	// TotalIn is the total number of records read from source.
	TotalIn int64
	// TotalOut is the total number of records written to sink.
	TotalOut int64
	// TotalError is the total number of errors encountered.
	TotalError int64
	// Duration is the total run duration in milliseconds.
	Duration int64
}

Stats holds the runtime statistics for a Flow.

type StatsCounter

type StatsCounter struct {
	// contains filtered or unexported fields
}

StatsCounter provides thread-safe statistics counting.

func (*StatsCounter) AddError

func (s *StatsCounter) AddError(n int64)

AddError increments the error counter by n.

func (*StatsCounter) AddIn

func (s *StatsCounter) AddIn(n int64)

AddIn increments the input counter by n.

func (*StatsCounter) AddOut

func (s *StatsCounter) AddOut(n int64)

AddOut increments the output counter by n.

func (*StatsCounter) Get

func (s *StatsCounter) Get() Stats

Get returns a snapshot of current statistics.

func (*StatsCounter) Reset

func (s *StatsCounter) Reset()

Reset resets all counters to zero.

func (*StatsCounter) SetDuration

func (s *StatsCounter) SetDuration(ms int64)

SetDuration sets the run duration in milliseconds.

Directories

Path Synopsis
Package app 提供应用程序框架。
Package app 提供应用程序框架。
builtins
processor/aggregate
Package aggregate 提供分组聚合处理器
Package aggregate 提供分组聚合处理器
processor/condition
Package condition 提供条件过滤处理器
Package condition 提供条件过滤处理器
processor/expr
Package expr 提供表达式过滤处理器
Package expr 提供表达式过滤处理器
processor/jqtransform
Package jqtransform 提供基于 jq 语法的超级转换处理器
Package jqtransform 提供基于 jq 语法的超级转换处理器
processor/transform
Package transform 提供字段转换处理器
Package transform 提供字段转换处理器
python/processor
Package processor 提供 Python 脚本 Processor 实现。
Package processor 提供 Python 脚本 Processor 实现。
python/runner
Package runner 提供 Python 子进程运行器。
Package runner 提供 Python 子进程运行器。
python/sink
Package sink 提供 Python 脚本 Sink 实现。
Package sink 提供 Python 脚本 Sink 实现。
python/source
Package source 提供 Python 脚本 Source 实现。
Package source 提供 Python 脚本 Source 实现。
sink/clickhouse
Package clickhouse 提供 ClickHouse 批量写入 Sink 支持批量写入、定时刷新和可选的字段映射
Package clickhouse 提供 ClickHouse 批量写入 Sink 支持批量写入、定时刷新和可选的字段映射
sink/collect
Package collect 提供内存收集的 Sink,主要用于测试
Package collect 提供内存收集的 Sink,主要用于测试
sink/csv
Package csv 提供写入 CSV 文件的 Sink
Package csv 提供写入 CSV 文件的 Sink
sink/json
Package json 提供写入 JSON 文件的 Sink
Package json 提供写入 JSON 文件的 Sink
sink/null
Package null 提供丢弃数据的 Sink,用于性能测试
Package null 提供丢弃数据的 Sink,用于性能测试
sink/output
Package output 提供控制台输出的 Sink
Package output 提供控制台输出的 Sink
source/csv
Package csv 提供从 CSV 文件读取数据的 Source
Package csv 提供从 CSV 文件读取数据的 Source
source/generator
Package generator 提供序列数据生成器
Package generator 提供序列数据生成器
source/json
Package json 提供从 JSON 文件读取数据的 Source
Package json 提供从 JSON 文件读取数据的 Source
source/kafka
Package kafka 提供从 Kafka Topic 消费消息的 Source
Package kafka 提供从 Kafka Topic 消费消息的 Source
source/static
Package static 提供静态内存数据源
Package static 提供静态内存数据源
types
Package types 提供内置组件的共享类型定义
Package types 提供内置组件的共享类型定义
Package metrics 提供 Prometheus 指标收集和暴露功能
Package metrics 提供 Prometheus 指标收集和暴露功能

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL