pipeline

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package pipeline provides a framework for managing data pipelines from/to PostgreSQL to/from various `Peer`s (ie data source/destination).

Supported peer types include ClickHouse, HTTP endpoints, Kafka, MQTT, and gRPC, with extensibility through Go plugins.

It defines a `Connector` interface that all `Peer` types must implement.

Index

Constants

View Source
const (
	ConnectorClickHouse = "clickhouse"
	ConnectorDebug      = "debug"
	ConnectorHTTP       = "http"
	ConnectorKafka      = "kafka"
	ConnectorMQTT       = "mqtt"
	ConnectorNATS       = "nats"
	ConnectorGRPC       = "grpc"
	ConnectorPostgres   = "postgres"
)

Predefined connectors

Variables

View Source
var (
	ErrConnectorTypeMismatch = errors.New("connector type mismatch")
)

Functions

func ProcessEvent

func ProcessEvent(
	pl Pipeline,
	source Source,
	event cdc.Event,
	sinkChannels map[string]chan cdc.Event,
)

processEvent handles the processing of a single event

func RegisterConnector

func RegisterConnector(name string, c Connector)

RegisterConnector adds a new connector to the registry. The name parameter is used as a key to identify the connector type.

func SetupSinks

func SetupSinks(
	ctx context.Context,
	m *Manager,
	wg *sync.WaitGroup,
	pl Pipeline,
	sinkChannels map[string]chan cdc.Event,
) error

Types

type Config

type Config struct {
	Peers     []Peer     `mapstructure:"peers"`
	Pipelines []Pipeline `mapstructure:"pipelines"`
}

func (*Config) GetPeer

func (c *Config) GetPeer(peerName string) *Peer

func (*Config) GetPipeline

func (c *Config) GetPipeline(pipelineName string) *Pipeline

type Connector

type Connector interface {
	// Connect initializes the connector with the provided configuration.
	// The config parameter is a raw JSON message containing connector-specific settings.
	// Additional arguments can be passed via the args parameter.
	Connect(config json.RawMessage, args ...any) error

	// Pub sends the given CDC event to the connector's destination.
	// It returns an error if the publish operation fails.
	Pub(event cdc.Event, args ...any) error

	// Sub provides a channel for consumingCDC events.
	Sub(args ...any) (<-chan cdc.Event, error)

	// Type returns the type of the connector (SUB, PUB, or PUBSUB)
	Type() ConnectorType

	Disconnect() error
}

A Connector represents a data pipeline component.

type ConnectorType

type ConnectorType int
const (
	ConnectorTypeUnknown ConnectorType = iota
	ConnectorTypePub                   // Sink / consumer-only
	ConnectorTypeSub                   // Source / producer-only
	ConnectorTypePubSub                // Source and sink
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles connectors and peers for data pipeline operations.

func NewManager

func NewManager() *Manager

NewManager returns a new Manager instance with the default connectors.

func (*Manager) AddPeer

func (m *Manager) AddPeer(connector string, name string) (*Peer, error)

NewPeer creates a new Peer

func (*Manager) AddSubscription

func (m *Manager) AddSubscription(sourceName, pipelineName string, sinkChannels map[string]chan cdc.Event)

AddSubscription adds a new subscription for a source

func (*Manager) GetPeer

func (m *Manager) GetPeer(name string) (*Peer, error)

func (*Manager) GetSubscriptions

func (m *Manager) GetSubscriptions(sourceName string) []SourceSubscription

GetSubscriptions returns all subscriptions for a source

func (*Manager) Init

func (m *Manager) Init(config *Config) error

Init initializes all peers from configuration

func (*Manager) IsFirstSubscription

func (m *Manager) IsFirstSubscription(sourceName string) bool

IsFirstSubscription checks if this is the first subscription for a source

func (*Manager) Peers

func (m *Manager) Peers() []Peer

func (*Manager) RegisterConnectorPlugin

func (m *Manager) RegisterConnectorPlugin(path string, name string) error

RegisterConnectorPlugin loads and registers a connector plugin from the specified path.

type Peer

type Peer struct {
	Name          string `mapstructure:"name"`
	ConnectorName string `mapstructure:"connector"`
	// Config contains the connection config of underlying library
	// eg github.com/IBM/sarama.Config, github.com/eclipse/paho.mqtt.golang.ClientOptions etc
	Config map[string]any `mapstructure:"config"`
	// Extra arguments for Connect, Pub, Sub methods
	Args []any
}

Peer is a data source/destination with an associated connector (ie NATS, Kafka, MQTT, ClickHouse, etc).

func (*Peer) Connector

func (p *Peer) Connector() Connector

type Pipeline

type Pipeline struct {
	Name    string   `mapstructure:"name"`
	Sources []Source `mapstructure:"sources"`
	// Pipeline transformations are applied after source transformations and before sink transformations.
	// These are applied to all CDC events flowing through a pipeline from its all sources to all sinks
	Transformations []transform.Transformation `mapstructure:"transformations"`
	Sinks           []Sink                     `mapstructure:"sinks"`
}

Pipeline configures a complete data processing pipeline.

type Sink

type Sink struct {
	// Name must match one of configured peers
	Name string `mapstructure:"name"`
	// Sink-specific transformations are applied after source transformations, pipeline transformations and before sending to speceific sink
	Transformations []transform.Transformation `mapstructure:"transformations"`
}

Sink is a pipeline output with its transformations.

type Source

type Source struct {
	// Name must match one of configured peers
	Name string `mapstructure:"name"`
	// Source transformations are applied (in the order specified) as soon as CDC event is received before any processing.
	Transformations []transform.Transformation `mapstructure:"transformations"`
}

Source is a pipeline input with its transformations.

type SourceSubscription

type SourceSubscription struct {
	SinkChannels map[string]chan cdc.Event
	PipelineName string
}

Directories

Path Synopsis
peer
kafka
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
mqtt
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
nats
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
pg
plugin_example command
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL