Documentation
¶
Overview ¶
Package pipeline provides a framework for managing data pipelines from/to PostgreSQL to/from various `Peer`s (ie data source/destination).
Supported peer types include ClickHouse, HTTP endpoints, Kafka, MQTT, and gRPC, with extensibility through Go plugins.
It defines a `Connector` interface that all `Peer` types must implement.
Index ¶
- Constants
- Variables
- func ProcessEvent(pl Pipeline, source Source, event cdc.Event, ...)
- func RegisterConnector(name string, c Connector)
- func SetupSinks(ctx context.Context, m *Manager, wg *sync.WaitGroup, pl Pipeline, ...) error
- type Config
- type Connector
- type ConnectorType
- type Manager
- func (m *Manager) AddPeer(connector string, name string) (*Peer, error)
- func (m *Manager) AddSubscription(sourceName, pipelineName string, sinkChannels map[string]chan cdc.Event)
- func (m *Manager) GetPeer(name string) (*Peer, error)
- func (m *Manager) GetSubscriptions(sourceName string) []SourceSubscription
- func (m *Manager) Init(config *Config) error
- func (m *Manager) IsFirstSubscription(sourceName string) bool
- func (m *Manager) Peers() []Peer
- func (m *Manager) RegisterConnectorPlugin(path string, name string) error
- type Peer
- type Pipeline
- type Sink
- type Source
- type SourceSubscription
Constants ¶
const ( ConnectorClickHouse = "clickhouse" ConnectorDebug = "debug" ConnectorHTTP = "http" ConnectorKafka = "kafka" ConnectorMQTT = "mqtt" ConnectorNATS = "nats" ConnectorGRPC = "grpc" ConnectorPostgres = "postgres" )
Predefined connectors
Variables ¶
var (
ErrConnectorTypeMismatch = errors.New("connector type mismatch")
)
Functions ¶
func ProcessEvent ¶
func ProcessEvent( pl Pipeline, source Source, event cdc.Event, sinkChannels map[string]chan cdc.Event, )
processEvent handles the processing of a single event
func RegisterConnector ¶
RegisterConnector adds a new connector to the registry. The name parameter is used as a key to identify the connector type.
Types ¶
type Config ¶
type Config struct {
Peers []Peer `mapstructure:"peers"`
Pipelines []Pipeline `mapstructure:"pipelines"`
}
func (*Config) GetPipeline ¶
type Connector ¶
type Connector interface {
// Connect initializes the connector with the provided configuration.
// The config parameter is a raw JSON message containing connector-specific settings.
// Additional arguments can be passed via the args parameter.
Connect(config json.RawMessage, args ...any) error
// Pub sends the given CDC event to the connector's destination.
// It returns an error if the publish operation fails.
Pub(event cdc.Event, args ...any) error
// Sub provides a channel for consumingCDC events.
Sub(args ...any) (<-chan cdc.Event, error)
// Type returns the type of the connector (SUB, PUB, or PUBSUB)
Type() ConnectorType
Disconnect() error
}
A Connector represents a data pipeline component.
type ConnectorType ¶
type ConnectorType int
const ( ConnectorTypeUnknown ConnectorType = iota ConnectorTypePub // Sink / consumer-only ConnectorTypeSub // Source / producer-only ConnectorTypePubSub // Source and sink )
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles connectors and peers for data pipeline operations.
func NewManager ¶
func NewManager() *Manager
NewManager returns a new Manager instance with the default connectors.
func (*Manager) AddSubscription ¶
func (m *Manager) AddSubscription(sourceName, pipelineName string, sinkChannels map[string]chan cdc.Event)
AddSubscription adds a new subscription for a source
func (*Manager) GetSubscriptions ¶
func (m *Manager) GetSubscriptions(sourceName string) []SourceSubscription
GetSubscriptions returns all subscriptions for a source
func (*Manager) IsFirstSubscription ¶
IsFirstSubscription checks if this is the first subscription for a source
type Peer ¶
type Peer struct {
Name string `mapstructure:"name"`
ConnectorName string `mapstructure:"connector"`
// Config contains the connection config of underlying library
// eg github.com/IBM/sarama.Config, github.com/eclipse/paho.mqtt.golang.ClientOptions etc
Config map[string]any `mapstructure:"config"`
// Extra arguments for Connect, Pub, Sub methods
Args []any
}
Peer is a data source/destination with an associated connector (ie NATS, Kafka, MQTT, ClickHouse, etc).
type Pipeline ¶
type Pipeline struct {
Name string `mapstructure:"name"`
Sources []Source `mapstructure:"sources"`
// Pipeline transformations are applied after source transformations and before sink transformations.
// These are applied to all CDC events flowing through a pipeline from its all sources to all sinks
Transformations []transform.Transformation `mapstructure:"transformations"`
Sinks []Sink `mapstructure:"sinks"`
}
Pipeline configures a complete data processing pipeline.
type Sink ¶
type Sink struct {
// Name must match one of configured peers
Name string `mapstructure:"name"`
// Sink-specific transformations are applied after source transformations, pipeline transformations and before sending to speceific sink
Transformations []transform.Transformation `mapstructure:"transformations"`
}
Sink is a pipeline output with its transformations.
type Source ¶
type Source struct {
// Name must match one of configured peers
Name string `mapstructure:"name"`
// Source transformations are applied (in the order specified) as soon as CDC event is received before any processing.
Transformations []transform.Transformation `mapstructure:"transformations"`
}
Source is a pipeline input with its transformations.
Directories
¶
| Path | Synopsis |
|---|---|
|
peer
|
|
|
kafka
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
|
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP. |
|
mqtt
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
|
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP. |
|
nats
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
|
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP. |
|
plugin_example
command
|
|
|
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.
|
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines. |