Documentation
¶
Overview ¶
Package cdc implements Change Data Capture modules, steps, and triggers.
Index ¶
- func NewBackpressureStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewMonitorStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSchemaHistoryStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSnapshotStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewSourceModule(name string, config map[string]any) (sdk.ModuleInstance, error)
- func NewStartStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewStopStep(name string, _ map[string]any) (sdk.StepInstance, error)
- func NewTrigger(config map[string]any, cb sdk.TriggerCallback) (sdk.TriggerInstance, error)
- func RegisterSource(sourceID string, p CDCProvider) error
- func UnregisterSource(sourceID string)
- type BackpressureMonitor
- type BackpressureStatus
- type BentoProvider
- func (p *BentoProvider) ConfigYAML(sourceID string) (string, error)
- func (p *BentoProvider) Connect(_ context.Context, config SourceConfig) error
- func (p *BentoProvider) Disconnect(_ context.Context, sourceID string) error
- func (p *BentoProvider) RegisterEventHandler(sourceID string, h EventHandler) error
- func (p *BentoProvider) SchemaHistory(_ context.Context, sourceID string, _ string) ([]SchemaVersion, error)
- func (p *BentoProvider) Snapshot(_ context.Context, sourceID string, tables []string) error
- func (p *BentoProvider) Status(_ context.Context, sourceID string) (*CDCStatus, error)
- type CDCProvider
- type CDCStatus
- type DMSClient
- type DMSConfig
- type DMSProvider
- func (p *DMSProvider) Connect(ctx context.Context, config SourceConfig) error
- func (p *DMSProvider) Disconnect(ctx context.Context, sourceID string) error
- func (p *DMSProvider) PauseSource(ctx context.Context, sourceID string) error
- func (p *DMSProvider) RegisterEventHandler(sourceID string, h EventHandler) error
- func (p *DMSProvider) ResumeSource(ctx context.Context, sourceID string) error
- func (p *DMSProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
- func (p *DMSProvider) Snapshot(ctx context.Context, sourceID string, tables []string) error
- func (p *DMSProvider) Status(ctx context.Context, sourceID string) (*CDCStatus, error)
- type DebeziumProvider
- func (p *DebeziumProvider) Connect(ctx context.Context, config SourceConfig) error
- func (p *DebeziumProvider) Disconnect(ctx context.Context, sourceID string) error
- func (p *DebeziumProvider) PauseSource(ctx context.Context, sourceID string) error
- func (p *DebeziumProvider) RegisterEventHandler(sourceID string, h EventHandler) error
- func (p *DebeziumProvider) ResumeSource(ctx context.Context, sourceID string) error
- func (p *DebeziumProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
- func (p *DebeziumProvider) Snapshot(ctx context.Context, sourceID string, tables []string) error
- func (p *DebeziumProvider) Status(ctx context.Context, sourceID string) (*CDCStatus, error)
- type EventHandler
- type MemoryProvider
- func (p *MemoryProvider) AddSchemaVersion(sourceID string, sv SchemaVersion) error
- func (p *MemoryProvider) Connect(ctx context.Context, config SourceConfig) error
- func (p *MemoryProvider) Disconnect(ctx context.Context, sourceID string) error
- func (p *MemoryProvider) InjectEvent(sourceID string, event map[string]any) error
- func (p *MemoryProvider) RegisterEventHandler(sourceID string, h EventHandler) error
- func (p *MemoryProvider) SchemaHistory(_ context.Context, sourceID string, table string) ([]SchemaVersion, error)
- func (p *MemoryProvider) SetLag(sourceID string, lagBytes, lagSeconds int64) error
- func (p *MemoryProvider) Snapshot(_ context.Context, sourceID string, tables []string) error
- func (p *MemoryProvider) Status(_ context.Context, sourceID string) (*CDCStatus, error)
- type SchemaVersion
- type SourceConfig
- type SourceModule
- type ThrottleableProvider
- type Trigger
- type TriggerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBackpressureStep ¶
NewBackpressureStep creates a new step.cdc_backpressure instance.
func NewMonitorStep ¶
NewMonitorStep creates a new step.cdc_monitor instance.
func NewSchemaHistoryStep ¶
NewSchemaHistoryStep creates a new step.cdc_schema_history instance.
func NewSnapshotStep ¶
NewSnapshotStep creates a new step.cdc_snapshot instance.
func NewSourceModule ¶
NewSourceModule creates a new CDC source module.
func NewStartStep ¶
NewStartStep creates a new step.cdc_start instance.
func NewStatusStep ¶
NewStatusStep creates a new step.cdc_status instance.
func NewStopStep ¶
NewStopStep creates a new step.cdc_stop instance.
func NewTrigger ¶
func NewTrigger(config map[string]any, cb sdk.TriggerCallback) (sdk.TriggerInstance, error)
NewTrigger creates a new CDC trigger instance.
func RegisterSource ¶
func RegisterSource(sourceID string, p CDCProvider) error
RegisterSource registers a CDC provider in the global registry.
func UnregisterSource ¶
func UnregisterSource(sourceID string)
UnregisterSource removes a source from the global registry.
Types ¶
type BackpressureMonitor ¶
type BackpressureMonitor struct {
ThresholdLagBytes int64
ThresholdLagSeconds int64
WarningMultiplier float64 // warn at this fraction of threshold (e.g. 0.8)
}
BackpressureMonitor evaluates CDC lag against configurable thresholds.
func (*BackpressureMonitor) Evaluate ¶
func (m *BackpressureMonitor) Evaluate(sourceID string, lagBytes, lagSeconds int64) BackpressureStatus
Evaluate returns the backpressure status for the given lag values.
type BackpressureStatus ¶
type BackpressureStatus struct {
SourceID string
LagBytes int64
LagSeconds int64
Status string // "healthy", "warning", "critical"
}
BackpressureStatus holds the lag evaluation result.
type BentoProvider ¶
type BentoProvider struct {
// contains filtered or unexported fields
}
BentoProvider implements CDCProvider by generating Bento input YAML configs and delegating actual stream management to the workflow-plugin-bento engine module.
The data-engineering plugin no longer manages Bento stream lifecycle in-process. Instead, the engine config declares a bento.input module (referenced via options.bento_module) alongside the cdc.source module. The Bento module handles stream execution and publishes events to the engine's EventBus, which the cdc trigger consumes.
func (*BentoProvider) ConfigYAML ¶
func (p *BentoProvider) ConfigYAML(sourceID string) (string, error)
ConfigYAML returns the generated Bento input YAML for a configured CDC source. This can be used to inspect what config was generated for the delegated module.
func (*BentoProvider) Connect ¶
func (p *BentoProvider) Connect(_ context.Context, config SourceConfig) error
Connect generates the Bento CDC input YAML and stores it for delegation. Actual stream management is handled by the bento.input engine module referenced in config.Options["bento_module"].
func (*BentoProvider) Disconnect ¶
func (p *BentoProvider) Disconnect(_ context.Context, sourceID string) error
Disconnect removes the CDC source configuration.
func (*BentoProvider) RegisterEventHandler ¶
func (p *BentoProvider) RegisterEventHandler(sourceID string, h EventHandler) error
RegisterEventHandler stores the event handler for a CDC source. Events are delivered by the delegated bento.input engine module via the engine's EventBus.
func (*BentoProvider) SchemaHistory ¶
func (p *BentoProvider) SchemaHistory(_ context.Context, sourceID string, _ string) ([]SchemaVersion, error)
SchemaHistory returns schema change history. Bento streams do not track DDL history natively; this always returns empty.
type CDCProvider ¶
type CDCProvider interface {
// Connect establishes a connection and starts the CDC stream.
Connect(ctx context.Context, config SourceConfig) error
// Disconnect stops the CDC stream and releases resources.
Disconnect(ctx context.Context, sourceID string) error
// Status returns the current status of a CDC stream.
Status(ctx context.Context, sourceID string) (*CDCStatus, error)
// Snapshot triggers a full table snapshot for the given tables.
Snapshot(ctx context.Context, sourceID string, tables []string) error
// SchemaHistory returns the schema change history for a table.
SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
// RegisterEventHandler registers a callback for CDC events from a source stream.
RegisterEventHandler(sourceID string, h EventHandler) error
}
CDCProvider defines the interface for Change Data Capture providers. Implementations: BentoProvider, DebeziumProvider, DMSProvider, MemoryProvider.
func LookupSource ¶
func LookupSource(sourceID string) (CDCProvider, error)
LookupSource finds a running CDC source provider by ID.
type CDCStatus ¶
type CDCStatus struct {
SourceID string `json:"source_id" yaml:"source_id"`
State string `json:"state" yaml:"state"`
Provider string `json:"provider" yaml:"provider"`
LastEvent string `json:"last_event" yaml:"last_event"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
LagBytes int64 `json:"lag_bytes,omitempty" yaml:"lag_bytes,omitempty"`
LagSeconds int64 `json:"lag_seconds,omitempty" yaml:"lag_seconds,omitempty"`
}
CDCStatus describes the current state of a CDC stream.
type DMSClient ¶
type DMSClient interface {
CreateReplicationTask(ctx context.Context, params *dms.CreateReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.CreateReplicationTaskOutput, error)
StartReplicationTask(ctx context.Context, params *dms.StartReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.StartReplicationTaskOutput, error)
StopReplicationTask(ctx context.Context, params *dms.StopReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.StopReplicationTaskOutput, error)
DescribeReplicationTasks(ctx context.Context, params *dms.DescribeReplicationTasksInput, optFns ...func(*dms.Options)) (*dms.DescribeReplicationTasksOutput, error)
DeleteReplicationTask(ctx context.Context, params *dms.DeleteReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.DeleteReplicationTaskOutput, error)
DescribeTableStatistics(ctx context.Context, params *dms.DescribeTableStatisticsInput, optFns ...func(*dms.Options)) (*dms.DescribeTableStatisticsOutput, error)
}
DMSClient is the interface over AWS DMS SDK calls, enabling mock injection in tests.
type DMSConfig ¶
type DMSConfig struct {
SourceEndpointARN string `json:"source_endpoint_arn" yaml:"source_endpoint_arn"`
TargetEndpointARN string `json:"target_endpoint_arn" yaml:"target_endpoint_arn"`
ReplicationInstanceARN string `json:"replication_instance_arn" yaml:"replication_instance_arn"`
MigrationType string `json:"migration_type" yaml:"migration_type"` // cdc, full-load-and-cdc
}
DMSConfig holds DMS-specific configuration for the provider.
type DMSProvider ¶
type DMSProvider struct {
// contains filtered or unexported fields
}
DMSProvider implements CDCProvider using AWS Database Migration Service. It creates and manages AWS DMS replication tasks via the AWS SDK. The SourceConfig.Connection field is not used; DMS config comes from DMSConfig.
func (*DMSProvider) Connect ¶
func (p *DMSProvider) Connect(ctx context.Context, config SourceConfig) error
Connect creates an AWS DMS replication task and starts it.
func (*DMSProvider) Disconnect ¶
func (p *DMSProvider) Disconnect(ctx context.Context, sourceID string) error
Disconnect stops and deletes the AWS DMS replication task.
func (*DMSProvider) PauseSource ¶
func (p *DMSProvider) PauseSource(ctx context.Context, sourceID string) error
PauseSource stops the DMS replication task (pause = stop for DMS).
func (*DMSProvider) RegisterEventHandler ¶
func (p *DMSProvider) RegisterEventHandler(sourceID string, h EventHandler) error
RegisterEventHandler registers a callback for CDC events from an AWS DMS task.
func (*DMSProvider) ResumeSource ¶
func (p *DMSProvider) ResumeSource(ctx context.Context, sourceID string) error
ResumeSource restarts the DMS replication task from where it stopped.
func (*DMSProvider) SchemaHistory ¶
func (p *DMSProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
SchemaHistory returns schema change stats from AWS DMS table statistics. DMS tracks DDL counts per table via DescribeTableStatistics.
type DebeziumProvider ¶
type DebeziumProvider struct {
// contains filtered or unexported fields
}
DebeziumProvider implements CDCProvider via the Kafka Connect REST API. It creates and manages Debezium connectors on an external Kafka Connect cluster. The SourceConfig.Connection field must be the base URL of the Kafka Connect REST API (e.g. "http://localhost:8083").
func (*DebeziumProvider) Connect ¶
func (p *DebeziumProvider) Connect(ctx context.Context, config SourceConfig) error
Connect creates a Debezium connector via POST /connectors on the Kafka Connect API. config.Connection must be the Kafka Connect base URL.
func (*DebeziumProvider) Disconnect ¶
func (p *DebeziumProvider) Disconnect(ctx context.Context, sourceID string) error
Disconnect deletes the Debezium connector via DELETE /connectors/{name}.
func (*DebeziumProvider) PauseSource ¶
func (p *DebeziumProvider) PauseSource(ctx context.Context, sourceID string) error
PauseSource pauses a Debezium connector via PUT /connectors/{name}/pause.
func (*DebeziumProvider) RegisterEventHandler ¶
func (p *DebeziumProvider) RegisterEventHandler(sourceID string, h EventHandler) error
RegisterEventHandler registers a callback for CDC events from a Debezium connector.
func (*DebeziumProvider) ResumeSource ¶
func (p *DebeziumProvider) ResumeSource(ctx context.Context, sourceID string) error
ResumeSource resumes a paused Debezium connector via PUT /connectors/{name}/resume.
func (*DebeziumProvider) SchemaHistory ¶
func (p *DebeziumProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
SchemaHistory returns schema change history for a table. NOTE: The Kafka Connect REST API does not expose schema history directly. A full implementation would require consuming the Debezium schema history Kafka topic. This implementation verifies the connector is running and returns an empty history.
type EventHandler ¶
EventHandler is called for each CDC event received from the provider. Implementations must be goroutine-safe.
type MemoryProvider ¶
type MemoryProvider struct {
// contains filtered or unexported fields
}
MemoryProvider is an in-memory CDCProvider implementation intended for testing. It allows injecting synthetic CDC events via InjectEvent and verifying provider behavior without real database connections or external services.
func NewMemoryProvider ¶
func NewMemoryProvider() *MemoryProvider
NewMemoryProvider creates a new MemoryProvider.
func (*MemoryProvider) AddSchemaVersion ¶
func (p *MemoryProvider) AddSchemaVersion(sourceID string, sv SchemaVersion) error
AddSchemaVersion records a synthetic DDL change event for testing schema history.
func (*MemoryProvider) Connect ¶
func (p *MemoryProvider) Connect(ctx context.Context, config SourceConfig) error
Connect registers an in-memory CDC source and starts its event dispatch goroutine.
func (*MemoryProvider) Disconnect ¶
func (p *MemoryProvider) Disconnect(ctx context.Context, sourceID string) error
Disconnect stops the event dispatch goroutine and removes the source.
func (*MemoryProvider) InjectEvent ¶
func (p *MemoryProvider) InjectEvent(sourceID string, event map[string]any) error
InjectEvent injects a synthetic CDC event into the named source's event stream. This is the primary API used by tests to simulate database change events.
func (*MemoryProvider) RegisterEventHandler ¶
func (p *MemoryProvider) RegisterEventHandler(sourceID string, h EventHandler) error
RegisterEventHandler registers a callback for events from a named source.
func (*MemoryProvider) SchemaHistory ¶
func (p *MemoryProvider) SchemaHistory(_ context.Context, sourceID string, table string) ([]SchemaVersion, error)
SchemaHistory returns the recorded DDL change history for a table.
func (*MemoryProvider) SetLag ¶
func (p *MemoryProvider) SetLag(sourceID string, lagBytes, lagSeconds int64) error
SetLag configures simulated CDC lag on a running memory source (for backpressure tests).
type SchemaVersion ¶
type SchemaVersion struct {
Table string `json:"table" yaml:"table"`
Version int64 `json:"version" yaml:"version"`
DDL string `json:"ddl" yaml:"ddl"`
AppliedAt string `json:"applied_at" yaml:"applied_at"`
}
SchemaVersion describes a schema change event for a table.
type SourceConfig ¶
type SourceConfig struct {
Provider string `json:"provider" yaml:"provider"`
SourceID string `json:"source_id" yaml:"source_id"`
SourceType string `json:"source_type" yaml:"source_type"`
Connection string `json:"connection" yaml:"connection"`
Tables []string `json:"tables" yaml:"tables"`
Options map[string]any `json:"options" yaml:"options"`
}
SourceConfig holds configuration for the cdc.source module.
type SourceModule ¶
type SourceModule struct {
// contains filtered or unexported fields
}
SourceModule is a CDC source module that streams change events from a database.
func (*SourceModule) Config ¶
func (m *SourceModule) Config() SourceConfig
Config returns the source configuration.
func (*SourceModule) Init ¶
func (m *SourceModule) Init() error
Init validates the module configuration.
func (*SourceModule) Provider ¶
func (m *SourceModule) Provider() CDCProvider
Provider returns the underlying CDC provider (used by steps).
func (*SourceModule) Start ¶
func (m *SourceModule) Start(ctx context.Context) error
Start initializes the CDC provider connection and registers the module in the global registry. The module mutex is NOT held during provider.Connect to avoid blocking Provider() and Config() accessors during potentially long network round-trips. m.provider and m.config are immutable after NewSourceModule, so no lock is needed to read them.
type ThrottleableProvider ¶
type ThrottleableProvider interface {
PauseSource(ctx context.Context, sourceID string) error
ResumeSource(ctx context.Context, sourceID string) error
}
ThrottleableProvider is an optional extension of CDCProvider for providers that support pausing and resuming the CDC stream.
type Trigger ¶
type Trigger struct {
// contains filtered or unexported fields
}
Trigger implements sdk.TriggerInstance for trigger.cdc. It registers an EventHandler on the named cdc.source provider and fires the workflow callback for each matching CDC change event.
The trigger filters events by table name and action (INSERT/UPDATE/DELETE) when configured. If tables or actions are empty, all events are forwarded.
type TriggerConfig ¶
type TriggerConfig struct {
// SourceID references a running cdc.source module by source_id.
SourceID string `json:"source_id" yaml:"source_id"`
// Tables filters events to the given table names. Empty = all tables.
Tables []string `json:"tables" yaml:"tables"`
// Actions filters events by DML operation type (INSERT, UPDATE, DELETE). Empty = all.
Actions []string `json:"actions" yaml:"actions"`
}
TriggerConfig holds configuration for the CDC trigger.