transform

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines. It's inspired by Debezium's [Single Message Transformations (SMTs)](https://docs.confluent.io/platform/current/connect/transforms/overview.html) usage.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config interface {
	// Validate validates the configuration
	Validate() error
	// Type returns the transformation type
	Type() string
}

Config is the interface that all transformations must implement

type ExtractConfig

type ExtractConfig struct {
	Fields []string `json:"fields"`
}

ExtractConfig holds the configuration for the extract transformation

func (*ExtractConfig) Type

func (c *ExtractConfig) Type() string

Type returns the type of the transformation

func (*ExtractConfig) Validate

func (c *ExtractConfig) Validate() error

Validate validates the ExtractConfig

type FilterConfig

type FilterConfig struct {
	TablePattern  string   `json:"tablePattern,omitempty"`
	Tables        []string `json:"tables,omitempty"`
	ExcludeTables []string `json:"excludeTables,omitempty"`
	Operations    []string `json:"operations,omitempty"`
}

func (*FilterConfig) Type

func (c *FilterConfig) Type() string

func (*FilterConfig) Validate

func (c *FilterConfig) Validate() error

type Func

type Func func(*cdc.Event) (*cdc.Event, error)

Func is the signature for all transformation functions

func Extract

func Extract(config *ExtractConfig) Func

Extract creates a Func that extracts specified fields from the CDC event

func Filter

func Filter(config *FilterConfig) Func

func Replace

func Replace(config *ReplaceConfig) Func

Replace creates a Func that performs the configured replacements

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager() *Manager

func (*Manager) Chain

func (m *Manager) Chain(configs []Transformation) (Func, error)

Chain creates a transformation chain from a list of configs

func (*Manager) RegisterBuiltins

func (m *Manager) RegisterBuiltins()

RegisterBuiltins registers all built-in transformations

type RegexReplacement

type RegexReplacement struct {
	Type    string `json:"type"`    // "schema", "table", or "column"
	Pattern string `json:"pattern"` // Regex pattern to match
	Replace string `json:"replace"` // Replacement string (can use regex groups)
}

RegexReplacement defines a regex-based replacement rule

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a collection of transformation functions

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new transformation registry

func (*Registry) Get

func (r *Registry) Get(name string) (func(Config) Func, error)

Get returns a transformation from the registry

func (*Registry) Register

func (r *Registry) Register(name string, factory func(Config) Func)

Register adds a transformation to the registry

type ReplaceConfig

type ReplaceConfig struct {
	// Schema replacements
	Schemas map[string]string `json:"schemas,omitempty"`

	// Table replacements
	Tables map[string]string `json:"tables,omitempty"`

	// Column/field replacements
	Columns map[string]string `json:"columns,omitempty"`

	// Regex replacements
	Regex []RegexReplacement `json:"regex,omitempty"`
}

ReplaceConfig holds the configuration for the replace transformation

func (*ReplaceConfig) Type

func (c *ReplaceConfig) Type() string

Type returns the type of the transformation

func (*ReplaceConfig) Validate

func (c *ReplaceConfig) Validate() error

Validate validates the ReplaceConfig

type Transformation

type Transformation struct {
	Config map[string]any `mapstructure:"config"`
	Type   string         `mapstructure:"type"`
}

Transformation represents a single transformation step (like Kafka SMT)

func (*Transformation) ToConfig

func (t *Transformation) ToConfig() (Config, error)

Helper method to convert Transformation to transform.Config interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL