Documentation
¶
Overview ¶
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines. It's inspired by Debezium's [Single Message Transformations (SMTs)](https://docs.confluent.io/platform/current/connect/transforms/overview.html) usage.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config interface {
// Validate validates the configuration
Validate() error
// Type returns the transformation type
Type() string
}
Config is the interface that all transformations must implement
type ExtractConfig ¶
type ExtractConfig struct {
Fields []string `json:"fields"`
}
ExtractConfig holds the configuration for the extract transformation
func (*ExtractConfig) Type ¶
func (c *ExtractConfig) Type() string
Type returns the type of the transformation
func (*ExtractConfig) Validate ¶
func (c *ExtractConfig) Validate() error
Validate validates the ExtractConfig
type FilterConfig ¶
type FilterConfig struct {
TablePattern string `json:"tablePattern,omitempty"`
Tables []string `json:"tables,omitempty"`
ExcludeTables []string `json:"excludeTables,omitempty"`
Operations []string `json:"operations,omitempty"`
}
func (*FilterConfig) Type ¶
func (c *FilterConfig) Type() string
func (*FilterConfig) Validate ¶
func (c *FilterConfig) Validate() error
type Func ¶
Func is the signature for all transformation functions
func Extract ¶
func Extract(config *ExtractConfig) Func
Extract creates a Func that extracts specified fields from the CDC event
func Filter ¶
func Filter(config *FilterConfig) Func
func Replace ¶
func Replace(config *ReplaceConfig) Func
Replace creates a Func that performs the configured replacements
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func NewManager() *Manager
func (*Manager) Chain ¶
func (m *Manager) Chain(configs []Transformation) (Func, error)
Chain creates a transformation chain from a list of configs
func (*Manager) RegisterBuiltins ¶
func (m *Manager) RegisterBuiltins()
RegisterBuiltins registers all built-in transformations
type RegexReplacement ¶
type RegexReplacement struct {
Type string `json:"type"` // "schema", "table", or "column"
Pattern string `json:"pattern"` // Regex pattern to match
Replace string `json:"replace"` // Replacement string (can use regex groups)
}
RegexReplacement defines a regex-based replacement rule
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is a collection of transformation functions
type ReplaceConfig ¶
type ReplaceConfig struct {
// Schema replacements
Schemas map[string]string `json:"schemas,omitempty"`
// Table replacements
Tables map[string]string `json:"tables,omitempty"`
// Column/field replacements
Columns map[string]string `json:"columns,omitempty"`
// Regex replacements
Regex []RegexReplacement `json:"regex,omitempty"`
}
ReplaceConfig holds the configuration for the replace transformation
func (*ReplaceConfig) Type ¶
func (c *ReplaceConfig) Type() string
Type returns the type of the transformation
func (*ReplaceConfig) Validate ¶
func (c *ReplaceConfig) Validate() error
Validate validates the ReplaceConfig
type Transformation ¶
type Transformation struct {
Config map[string]any `mapstructure:"config"`
Type string `mapstructure:"type"`
}
Transformation represents a single transformation step (like Kafka SMT)
func (*Transformation) ToConfig ¶
func (t *Transformation) ToConfig() (Config, error)
Helper method to convert Transformation to transform.Config interface