Documentation
¶
Overview ¶
Package cdc provides Change Data Capture functionality for real-time data replication
Index ¶
- type BatchEventHandler
- type CDCConfig
- type CDCConnector
- type ChangeEvent
- type Checkpoint
- type CheckpointStorage
- type Checkpointer
- type ColumnInfo
- type ComponentHealth
- type ConnectorType
- type DeadLetterQueue
- type DeadLetterStats
- type EventFilter
- type EventHandler
- type EventMetrics
- type EventProcessor
- type FilterCondition
- type HandlerMetrics
- type HealthCheckConfig
- type HealthMonitor
- type HealthStatus
- type JSONMessageDeserializer
- type JSONMessageSerializer
- type KafkaConfig
- type KafkaConsumer
- func (kc *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
- func (kc *KafkaConsumer) Close() error
- func (kc *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (kc *KafkaConsumer) GetMetrics() KafkaMetrics
- func (kc *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
- func (kc *KafkaConsumer) Subscribe(topics []string, handler EventHandler) error
- type KafkaMessage
- type KafkaMetrics
- type KafkaProducer
- func (kp *KafkaProducer) Close() error
- func (kp *KafkaProducer) Connect() error
- func (kp *KafkaProducer) GetMetrics() KafkaMetrics
- func (kp *KafkaProducer) ProduceEvent(ctx context.Context, event ChangeEvent) error
- func (kp *KafkaProducer) ProduceEvents(ctx context.Context, events []ChangeEvent) error
- type Manager
- type ManagerConfig
- type ManagerStatus
- type MemoryCheckpointStorage
- type MemoryDeadLetterQueue
- type MessageDeserializer
- type MessageSerializer
- type MongoChangeEvent
- type MongoDBConfig
- type MongoDBConnector
- func (c *MongoDBConnector) Acknowledge(position Position) error
- func (c *MongoDBConnector) Connect(config CDCConfig) error
- func (c *MongoDBConnector) GetPosition() Position
- func (c *MongoDBConnector) Health() HealthStatus
- func (c *MongoDBConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *MongoDBConnector) Stop() error
- func (c *MongoDBConnector) Subscribe(collections []string) error
- type MySQLConfig
- type MySQLConnector
- func (c *MySQLConnector) Acknowledge(position Position) error
- func (c *MySQLConnector) Connect(config CDCConfig) error
- func (c *MySQLConnector) GetPosition() Position
- func (c *MySQLConnector) Health() HealthStatus
- func (c *MySQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *MySQLConnector) Stop() error
- func (c *MySQLConnector) Subscribe(tables []string) error
- type Namespace
- type OperationType
- type Position
- type PostgreSQLConfig
- type PostgreSQLConnector
- func (c *PostgreSQLConnector) Acknowledge(position Position) error
- func (c *PostgreSQLConnector) Connect(config CDCConfig) error
- func (c *PostgreSQLConnector) GetPosition() Position
- func (c *PostgreSQLConnector) Health() HealthStatus
- func (c *PostgreSQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *PostgreSQLConnector) Stop() error
- func (c *PostgreSQLConnector) Subscribe(tables []string) error
- type ProcessingTask
- type RetryPolicy
- type SchemaChange
- type SourceInfo
- type StreamMetrics
- type StreamProcessor
- func (sp *StreamProcessor) AddFilter(filter EventFilter)
- func (sp *StreamProcessor) GetDeadLetterQueueStats() DeadLetterStats
- func (sp *StreamProcessor) GetMetrics() StreamMetrics
- func (sp *StreamProcessor) ProcessEvent(ctx context.Context, event ChangeEvent) error
- func (sp *StreamProcessor) ProcessEvents(ctx context.Context, events []ChangeEvent) error
- func (sp *StreamProcessor) RegisterBatchHandler(pattern string, handler BatchEventHandler)
- func (sp *StreamProcessor) RegisterHandler(pattern string, handler EventHandler)
- func (sp *StreamProcessor) Start(ctx context.Context) error
- func (sp *StreamProcessor) Stop() error
- type StreamWorker
- type StreamingConfig
- type TableSchema
- type TransactionInfo
- type TruncatedArray
- type UpdateDescription
- type WorkerMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchEventHandler ¶
type BatchEventHandler func(ctx context.Context, events []ChangeEvent) error
BatchEventHandler is a function type for handling batches of change events
type CDCConfig ¶
type CDCConfig struct {
Type ConnectorType `json:"type" yaml:"type"`
ConnectionStr string `json:"connection_string" yaml:"connection_string"`
Database string `json:"database" yaml:"database"`
Tables []string `json:"tables" yaml:"tables"`
StartPosition *Position `json:"start_position,omitempty" yaml:"start_position,omitempty"`
BatchSize int `json:"batch_size" yaml:"batch_size"`
PollInterval time.Duration `json:"poll_interval" yaml:"poll_interval"`
BufferSize int `json:"buffer_size" yaml:"buffer_size"`
Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}
CDCConfig contains configuration for CDC connectors
type CDCConnector ¶
type CDCConnector interface {
// Connect establishes connection to the data source
Connect(config CDCConfig) error
// Subscribe starts listening to changes on specified tables/collections
Subscribe(tables []string) error
// ReadChanges returns a channel of change events
ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
// GetPosition returns the current replication position
GetPosition() Position
// Acknowledge confirms processing of events up to the given position
Acknowledge(position Position) error
// Stop gracefully shuts down the connector
Stop() error
// Health returns the health status of the connector
Health() HealthStatus
}
CDCConnector defines the interface for Change Data Capture connectors
type ChangeEvent ¶
type ChangeEvent struct {
ID string `json:"id"`
Operation OperationType `json:"operation"`
Database string `json:"database"`
Table string `json:"table"`
Schema string `json:"schema,omitempty"`
Before map[string]interface{} `json:"before,omitempty"`
After map[string]interface{} `json:"after,omitempty"`
Timestamp time.Time `json:"timestamp"`
Position Position `json:"position"`
TransactionID string `json:"transaction_id,omitempty"`
// Additional metadata
Metadata map[string]interface{} `json:"metadata,omitempty"`
Source SourceInfo `json:"source"`
}
ChangeEvent represents a single change event from the database
func (*ChangeEvent) ConvertToRecord ¶
func (ce *ChangeEvent) ConvertToRecord() (*models.Record, error)
ConvertToRecord converts a ChangeEvent to the unified Nebula Record
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
Position Position `json:"position"`
Timestamp time.Time `json:"timestamp"`
EventCount int64 `json:"event_count"`
ProcessedAt time.Time `json:"processed_at"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Checkpoint represents a savepoint in the event stream
type CheckpointStorage ¶
type CheckpointStorage interface {
Save(checkpoint Checkpoint) error
Load() (Checkpoint, error)
Delete(checkpointID string) error
List() ([]Checkpoint, error)
}
CheckpointStorage defines the interface for checkpoint storage
type Checkpointer ¶
type Checkpointer struct {
// contains filtered or unexported fields
}
Checkpointer manages checkpoints for stream processing
func NewCheckpointer ¶
func NewCheckpointer(storage CheckpointStorage, interval time.Duration, logger *zap.Logger) *Checkpointer
NewCheckpointer creates a new checkpointer
func (*Checkpointer) Checkpoint ¶
func (c *Checkpointer) Checkpoint() error
Checkpoint saves the current processing state
func (*Checkpointer) GetLastCheckpoint ¶
func (c *Checkpointer) GetLastCheckpoint() Checkpoint
GetLastCheckpoint returns the last saved checkpoint
type ColumnInfo ¶
ColumnInfo represents PostgreSQL column information
type ComponentHealth ¶
type ComponentHealth struct {
Name string `json:"name"`
Status string `json:"status"`
LastCheck time.Time `json:"last_check"`
FailureCount int `json:"failure_count"`
SuccessCount int `json:"success_count"`
LastError string `json:"last_error,omitempty"`
ResponseTime time.Duration `json:"response_time"`
}
ComponentHealth tracks the health of a single component
type ConnectorType ¶
type ConnectorType string
ConnectorType represents the type of CDC connector
const ( ConnectorPostgreSQL ConnectorType = "postgresql" ConnectorMySQL ConnectorType = "mysql" ConnectorMongoDB ConnectorType = "mongodb" ConnectorKafka ConnectorType = "kafka" )
type DeadLetterQueue ¶
type DeadLetterQueue interface {
Send(task ProcessingTask, err error) error
Read(limit int) ([]ProcessingTask, error)
Acknowledge(taskID string) error
GetStats() DeadLetterStats
}
DeadLetterQueue defines the interface for handling failed events
type DeadLetterStats ¶
type DeadLetterStats struct {
TotalEvents int64 `json:"total_events"`
PendingEvents int64 `json:"pending_events"`
ProcessedEvents int64 `json:"processed_events"`
OldestEvent time.Time `json:"oldest_event"`
LastAdded time.Time `json:"last_added"`
}
DeadLetterStats contains statistics for the dead letter queue
type EventFilter ¶
type EventFilter struct {
IncludeTables []string `json:"include_tables,omitempty"`
ExcludeTables []string `json:"exclude_tables,omitempty"`
Operations []OperationType `json:"operations,omitempty"`
Conditions []FilterCondition `json:"conditions,omitempty"`
}
EventFilter defines filtering criteria for change events
func (*EventFilter) AddCondition ¶
func (f *EventFilter) AddCondition(field, operator string, value interface{})
AddCondition adds a filter condition
func (*EventFilter) ShouldInclude ¶
func (f *EventFilter) ShouldInclude(event ChangeEvent) bool
ShouldInclude checks if an event should be included based on the filter
type EventHandler ¶
type EventHandler func(ctx context.Context, event ChangeEvent) error
EventHandler is a function type for handling change events
type EventMetrics ¶
type EventMetrics struct {
EventsReceived int64 `json:"events_received"`
EventsProcessed int64 `json:"events_processed"`
EventsFiltered int64 `json:"events_filtered"`
EventsErrored int64 `json:"events_errored"`
ProcessingLatency time.Duration `json:"processing_latency"`
ThroughputRPS float64 `json:"throughput_rps"`
LastEventTime time.Time `json:"last_event_time"`
BacklogSize int64 `json:"backlog_size"`
}
EventMetrics contains metrics for CDC operations
type EventProcessor ¶
type EventProcessor interface {
Process(ctx context.Context, event ChangeEvent) error
ProcessBatch(ctx context.Context, events []ChangeEvent) error
Stop() error
}
EventProcessor defines the interface for processing change events
type FilterCondition ¶
type FilterCondition struct {
Field string `json:"field"`
Operator string `json:"operator"` // eq, ne, lt, gt, in, like, etc.
Value interface{} `json:"value"`
}
FilterCondition represents a single filtering condition
type HandlerMetrics ¶
type HandlerMetrics struct {
EventsProcessed int64 `json:"events_processed"`
EventsErrored int64 `json:"events_errored"`
AverageLatency time.Duration `json:"average_latency"`
LastExecution time.Time `json:"last_execution"`
}
HandlerMetrics contains metrics for a specific handler
type HealthCheckConfig ¶
type HealthCheckConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
FailureThreshold int `json:"failure_threshold"`
SuccessThreshold int `json:"success_threshold"`
}
HealthCheckConfig contains health monitoring configuration
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor monitors the health of CDC components
func NewHealthMonitor ¶
func NewHealthMonitor(config HealthCheckConfig, manager *Manager, logger *zap.Logger) *HealthMonitor
NewHealthMonitor creates a new health monitor
func (*HealthMonitor) GetComponentHealth ¶
func (hm *HealthMonitor) GetComponentHealth() map[string]*ComponentHealth
GetComponentHealth returns the health of all components
type HealthStatus ¶
type HealthStatus struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
LastEvent time.Time `json:"last_event"`
EventCount int64 `json:"event_count"`
ErrorCount int64 `json:"error_count"`
Lag time.Duration `json:"lag,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
}
HealthStatus represents the health of a CDC connector
func (HealthStatus) IsHealthy ¶
func (h HealthStatus) IsHealthy() bool
IsHealthy returns true if the health status indicates the connector is healthy
type JSONMessageDeserializer ¶
type JSONMessageDeserializer struct{}
JSONMessageDeserializer provides JSON deserialization
func (*JSONMessageDeserializer) ContentType ¶
func (d *JSONMessageDeserializer) ContentType() string
ContentType returns the content type
func (*JSONMessageDeserializer) Deserialize ¶
func (d *JSONMessageDeserializer) Deserialize(data []byte) (ChangeEvent, error)
Deserialize deserializes JSON data to a ChangeEvent
type JSONMessageSerializer ¶
type JSONMessageSerializer struct{}
JSONMessageSerializer provides JSON serialization
func (*JSONMessageSerializer) ContentType ¶
func (s *JSONMessageSerializer) ContentType() string
ContentType returns the content type
func (*JSONMessageSerializer) Serialize ¶
func (s *JSONMessageSerializer) Serialize(event ChangeEvent) ([]byte, error)
Serialize serializes a ChangeEvent to JSON
func (*JSONMessageSerializer) SerializeKey ¶
func (s *JSONMessageSerializer) SerializeKey(event ChangeEvent) ([]byte, error)
SerializeKey serializes the event key
type KafkaConfig ¶
type KafkaConfig struct {
Brokers []string `json:"brokers"`
SecurityProtocol string `json:"security_protocol"`
SASLMechanism string `json:"sasl_mechanism"`
SASLUsername string `json:"sasl_username"`
SASLPassword string `json:"sasl_password"`
EnableTLS bool `json:"enable_tls"`
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
// Producer settings
ProducerAcks string `json:"producer_acks"` // all, 1, 0
ProducerRetries int `json:"producer_retries"`
ProducerBatchSize int `json:"producer_batch_size"`
ProducerLingerMS int `json:"producer_linger_ms"`
ProducerCompression string `json:"producer_compression"` // none, gzip, snappy, lz4
EnableIdempotence bool `json:"enable_idempotence"`
TransactionalID string `json:"transactional_id"`
// Consumer settings
ConsumerGroupID string `json:"consumer_group_id"`
AutoOffsetReset string `json:"auto_offset_reset"` // earliest, latest
EnableAutoCommit bool `json:"enable_auto_commit"`
SessionTimeoutMS int `json:"session_timeout_ms"`
HeartbeatIntervalMS int `json:"heartbeat_interval_ms"`
MaxPollRecords int `json:"max_poll_records"`
// Topic settings
TopicPrefix string `json:"topic_prefix"`
TopicSuffix string `json:"topic_suffix"`
TopicMapping map[string]string `json:"topic_mapping"`
DefaultTopic string `json:"default_topic"`
// Message settings
MessageFormat string `json:"message_format"` // json, avro, protobuf
IncludeSchema bool `json:"include_schema"`
CompressionType string `json:"compression_type"`
// Exactly-once settings
ExactlyOnce bool `json:"exactly_once"`
TransactionTimeoutMS int `json:"transaction_timeout_ms"`
}
KafkaConfig contains Kafka-specific configuration
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer provides Kafka CDC event consumption
func NewKafkaConsumer ¶
func NewKafkaConsumer(config KafkaConfig, logger *zap.Logger) *KafkaConsumer
NewKafkaConsumer creates a new Kafka consumer
func (*KafkaConsumer) Cleanup ¶
func (kc *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close() error
Close closes the Kafka consumer
func (*KafkaConsumer) ConsumeClaim ¶
func (kc *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) GetMetrics ¶
func (kc *KafkaConsumer) GetMetrics() KafkaMetrics
GetMetrics returns Kafka consumer metrics
func (*KafkaConsumer) Setup ¶
func (kc *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
Setup implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) Subscribe ¶
func (kc *KafkaConsumer) Subscribe(topics []string, handler EventHandler) error
Subscribe subscribes to Kafka topics and starts consuming
type KafkaMessage ¶
type KafkaMessage struct {
Key string `json:"key"`
Value ChangeEvent `json:"value"`
Headers map[string]string `json:"headers"`
Timestamp time.Time `json:"timestamp"`
}
KafkaMessage represents a message sent to/from Kafka
type KafkaMetrics ¶
type KafkaMetrics struct {
MessagesProduced int64 `json:"messages_produced"`
MessagesConsumed int64 `json:"messages_consumed"`
MessagesFailed int64 `json:"messages_failed"`
MessagesRetried int64 `json:"messages_retried"`
BytesProduced int64 `json:"bytes_produced"`
BytesConsumed int64 `json:"bytes_consumed"`
ProducerLatency time.Duration `json:"producer_latency"`
ConsumerLatency time.Duration `json:"consumer_latency"`
TransactionsCommitted int64 `json:"transactions_committed"`
TransactionsAborted int64 `json:"transactions_aborted"`
LastProducedTime time.Time `json:"last_produced_time"`
LastConsumedTime time.Time `json:"last_consumed_time"`
}
KafkaMetrics contains Kafka-specific metrics
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
KafkaProducer provides Kafka integration for CDC events with exactly-once semantics
func NewKafkaProducer ¶
func NewKafkaProducer(config KafkaConfig, logger *zap.Logger) *KafkaProducer
NewKafkaProducer creates a new Kafka producer
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close() error
Close closes the Kafka producer
func (*KafkaProducer) Connect ¶
func (kp *KafkaProducer) Connect() error
Connect establishes connection to Kafka
func (*KafkaProducer) GetMetrics ¶
func (kp *KafkaProducer) GetMetrics() KafkaMetrics
GetMetrics returns Kafka producer metrics
func (*KafkaProducer) ProduceEvent ¶
func (kp *KafkaProducer) ProduceEvent(ctx context.Context, event ChangeEvent) error
ProduceEvent produces a single CDC event to Kafka
func (*KafkaProducer) ProduceEvents ¶
func (kp *KafkaProducer) ProduceEvents(ctx context.Context, events []ChangeEvent) error
ProduceEvents produces multiple CDC events to Kafka with exactly-once semantics
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates multiple CDC connectors and event processing
func NewManager ¶
func NewManager(config ManagerConfig, logger *zap.Logger) *Manager
NewManager creates a new CDC manager
func (*Manager) AddConnector ¶
AddConnector adds a new CDC connector
func (*Manager) GetStatus ¶
func (m *Manager) GetStatus() ManagerStatus
GetStatus returns the current status of the CDC manager
func (*Manager) RemoveConnector ¶
RemoveConnector removes a CDC connector
type ManagerConfig ¶
type ManagerConfig struct {
// Connector configurations
Connectors map[string]CDCConfig `json:"connectors"`
// Stream processing configuration
Streaming StreamingConfig `json:"streaming"`
// Kafka configuration
Kafka KafkaConfig `json:"kafka"`
EnableKafka bool `json:"enable_kafka"`
// Health monitoring
HealthCheck HealthCheckConfig `json:"health_check"`
// Global settings
GlobalTimeout time.Duration `json:"global_timeout"`
RetryPolicy RetryPolicy `json:"retry_policy"`
// Monitoring and metrics
MetricsEnabled bool `json:"metrics_enabled"`
MetricsPort int `json:"metrics_port"`
}
ManagerConfig contains configuration for the CDC manager
type ManagerStatus ¶
type ManagerStatus struct {
Running bool `json:"running"`
StartTime time.Time `json:"start_time"`
Uptime time.Duration `json:"uptime"`
Connectors map[string]HealthStatus `json:"connectors"`
StreamProcessor *StreamMetrics `json:"stream_processor,omitempty"`
KafkaProducer *KafkaMetrics `json:"kafka_producer,omitempty"`
ComponentHealth map[string]*ComponentHealth `json:"component_health"`
OverallHealth string `json:"overall_health"`
}
ManagerStatus represents the overall status of the CDC manager
type MemoryCheckpointStorage ¶
type MemoryCheckpointStorage struct {
// contains filtered or unexported fields
}
MemoryCheckpointStorage provides in-memory checkpoint storage
func NewMemoryCheckpointStorage ¶
func NewMemoryCheckpointStorage() *MemoryCheckpointStorage
NewMemoryCheckpointStorage creates a new in-memory checkpoint storage
func (*MemoryCheckpointStorage) Delete ¶
func (m *MemoryCheckpointStorage) Delete(checkpointID string) error
Delete deletes a checkpoint
func (*MemoryCheckpointStorage) List ¶
func (m *MemoryCheckpointStorage) List() ([]Checkpoint, error)
List lists all checkpoints
func (*MemoryCheckpointStorage) Load ¶
func (m *MemoryCheckpointStorage) Load() (Checkpoint, error)
Load loads the latest checkpoint
func (*MemoryCheckpointStorage) Save ¶
func (m *MemoryCheckpointStorage) Save(checkpoint Checkpoint) error
Save saves a checkpoint
type MemoryDeadLetterQueue ¶
type MemoryDeadLetterQueue struct {
// contains filtered or unexported fields
}
MemoryDeadLetterQueue provides in-memory dead letter queue implementation
func NewMemoryDeadLetterQueue ¶
func NewMemoryDeadLetterQueue(maxSize int) *MemoryDeadLetterQueue
NewMemoryDeadLetterQueue creates a new in-memory dead letter queue
func (*MemoryDeadLetterQueue) Acknowledge ¶
func (dlq *MemoryDeadLetterQueue) Acknowledge(taskID string) error
Acknowledge marks a task as processed
func (*MemoryDeadLetterQueue) GetStats ¶
func (dlq *MemoryDeadLetterQueue) GetStats() DeadLetterStats
GetStats returns dead letter queue statistics
func (*MemoryDeadLetterQueue) Read ¶
func (dlq *MemoryDeadLetterQueue) Read(limit int) ([]ProcessingTask, error)
Read retrieves pending tasks from the dead letter queue
func (*MemoryDeadLetterQueue) Send ¶
func (dlq *MemoryDeadLetterQueue) Send(task ProcessingTask, err error) error
Send adds a failed task to the dead letter queue
type MessageDeserializer ¶
type MessageDeserializer interface {
Deserialize(data []byte) (ChangeEvent, error)
ContentType() string
}
MessageDeserializer defines the interface for message deserialization
type MessageSerializer ¶
type MessageSerializer interface {
Serialize(event ChangeEvent) ([]byte, error)
SerializeKey(event ChangeEvent) ([]byte, error)
ContentType() string
}
MessageSerializer defines the interface for message serialization
type MongoChangeEvent ¶
type MongoChangeEvent struct {
ID bson.Raw `bson:"_id"`
OperationType string `bson:"operationType"`
ClusterTime primitive.Timestamp `bson:"clusterTime"`
FullDocument map[string]interface{} `bson:"fullDocument,omitempty"`
FullDocumentBeforeChange map[string]interface{} `bson:"fullDocumentBeforeChange,omitempty"`
DocumentKey map[string]interface{} `bson:"documentKey,omitempty"`
UpdateDescription *UpdateDescription `bson:"updateDescription,omitempty"`
Namespace Namespace `bson:"ns"`
TxnNumber *int64 `bson:"txnNumber,omitempty"`
SessionID *bson.Raw `bson:"lsid,omitempty"`
}
MongoChangeEvent represents a MongoDB change event
type MongoDBConfig ¶
type MongoDBConfig struct {
ResumeToken string `json:"resume_token,omitempty"`
StartAtOperationTime *primitive.Timestamp `json:"start_at_operation_time,omitempty"`
FullDocument string `json:"full_document"` // updateLookup, default
FullDocumentBeforeChange string `json:"full_document_before_change"` // whenAvailable, required, off
BatchSize int32 `json:"batch_size"`
MaxAwaitTime time.Duration `json:"max_await_time"`
Collation *options.Collation `json:"collation,omitempty"`
IncludeOperationTypes []string `json:"include_operation_types,omitempty"`
}
MongoDBConfig contains MongoDB-specific configuration
type MongoDBConnector ¶
type MongoDBConnector struct {
// contains filtered or unexported fields
}
MongoDBConnector implements CDC for MongoDB using change streams
func NewMongoDBConnector ¶
func NewMongoDBConnector(logger *zap.Logger) *MongoDBConnector
NewMongoDBConnector creates a new MongoDB CDC connector
func (*MongoDBConnector) Acknowledge ¶
func (c *MongoDBConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*MongoDBConnector) Connect ¶
func (c *MongoDBConnector) Connect(config CDCConfig) error
Connect establishes connection to MongoDB and sets up change streams
func (*MongoDBConnector) GetPosition ¶
func (c *MongoDBConnector) GetPosition() Position
GetPosition returns the current change stream position
func (*MongoDBConnector) Health ¶
func (c *MongoDBConnector) Health() HealthStatus
Health returns the health status of the connector
func (*MongoDBConnector) ReadChanges ¶
func (c *MongoDBConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*MongoDBConnector) Stop ¶
func (c *MongoDBConnector) Stop() error
Stop gracefully shuts down the connector
func (*MongoDBConnector) Subscribe ¶
func (c *MongoDBConnector) Subscribe(collections []string) error
Subscribe starts listening to changes on specified collections
type MySQLConfig ¶
type MySQLConfig struct {
ServerID uint32 `json:"server_id"`
StartPosition string `json:"start_position,omitempty"`
Flavor string `json:"flavor"` // mysql or mariadb
GTIDEnabled bool `json:"gtid_enabled"`
HeartbeatPeriod time.Duration `json:"heartbeat_period"`
ReadTimeout time.Duration `json:"read_timeout"`
UseDecimal bool `json:"use_decimal"`
IgnoreJSONDecodeError bool `json:"ignore_json_decode_error"`
}
MySQLConfig contains MySQL-specific configuration
type MySQLConnector ¶
type MySQLConnector struct {
// contains filtered or unexported fields
}
MySQLConnector implements CDC for MySQL using binary log replication
func NewMySQLConnector ¶
func NewMySQLConnector(logger *zap.Logger) *MySQLConnector
NewMySQLConnector creates a new MySQL CDC connector
func (*MySQLConnector) Acknowledge ¶
func (c *MySQLConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*MySQLConnector) Connect ¶
func (c *MySQLConnector) Connect(config CDCConfig) error
Connect establishes connection to MySQL and sets up binary log replication
func (*MySQLConnector) GetPosition ¶
func (c *MySQLConnector) GetPosition() Position
GetPosition returns the current replication position
func (*MySQLConnector) Health ¶
func (c *MySQLConnector) Health() HealthStatus
Health returns the health status of the connector
func (*MySQLConnector) ReadChanges ¶
func (c *MySQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*MySQLConnector) Stop ¶
func (c *MySQLConnector) Stop() error
Stop gracefully shuts down the connector
func (*MySQLConnector) Subscribe ¶
func (c *MySQLConnector) Subscribe(tables []string) error
Subscribe starts listening to changes on specified tables
type OperationType ¶
type OperationType string
OperationType represents the type of database operation
const ( OperationInsert OperationType = "INSERT" OperationUpdate OperationType = "UPDATE" OperationDelete OperationType = "DELETE" OperationDDL OperationType = "DDL" // Schema changes OperationCommit OperationType = "COMMIT" // Transaction commit )
type Position ¶
type Position struct {
Type string `json:"type"`
Value interface{} `json:"value"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Position represents a replication position in the change stream
type PostgreSQLConfig ¶
type PostgreSQLConfig struct {
SlotName string `json:"slot_name"`
Publication string `json:"publication"`
StartLSN string `json:"start_lsn,omitempty"`
TempSlot bool `json:"temp_slot"`
PluginName string `json:"plugin_name"`
StatusInterval int `json:"status_interval"`
}
PostgreSQLConfig contains PostgreSQL-specific configuration
type PostgreSQLConnector ¶
type PostgreSQLConnector struct {
// contains filtered or unexported fields
}
PostgreSQLConnector implements CDC for PostgreSQL using logical replication
func NewPostgreSQLConnector ¶
func NewPostgreSQLConnector(logger *zap.Logger) *PostgreSQLConnector
NewPostgreSQLConnector creates a new PostgreSQL CDC connector
func (*PostgreSQLConnector) Acknowledge ¶
func (c *PostgreSQLConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*PostgreSQLConnector) Connect ¶
func (c *PostgreSQLConnector) Connect(config CDCConfig) error
Connect establishes connection to PostgreSQL and sets up logical replication
func (*PostgreSQLConnector) GetPosition ¶
func (c *PostgreSQLConnector) GetPosition() Position
GetPosition returns the current replication position
func (*PostgreSQLConnector) Health ¶
func (c *PostgreSQLConnector) Health() HealthStatus
Health returns the health status of the connector
func (*PostgreSQLConnector) ReadChanges ¶
func (c *PostgreSQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*PostgreSQLConnector) Stop ¶
func (c *PostgreSQLConnector) Stop() error
Stop gracefully shuts down the connector
func (*PostgreSQLConnector) Subscribe ¶
func (c *PostgreSQLConnector) Subscribe(tables []string) error
Subscribe starts listening to changes on specified tables
type ProcessingTask ¶
type ProcessingTask struct {
Events []ChangeEvent
Handler string
Partition int
Timestamp time.Time
RetryCount int
MaxRetries int
}
ProcessingTask represents a task to process events
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int `json:"max_retries"`
InitialBackoff time.Duration `json:"initial_backoff"`
MaxBackoff time.Duration `json:"max_backoff"`
Multiplier float64 `json:"multiplier"`
}
RetryPolicy defines retry behavior
type SchemaChange ¶
type SchemaChange struct {
Type string `json:"type"` // CREATE, ALTER, DROP
Object string `json:"object"` // TABLE, INDEX, etc.
Name string `json:"name"`
Statement string `json:"statement"`
Before map[string]interface{} `json:"before,omitempty"`
After map[string]interface{} `json:"after,omitempty"`
}
SchemaChange represents a DDL change event
type SourceInfo ¶
type SourceInfo struct {
Name string `json:"name"`
Database string `json:"database"`
Table string `json:"table"`
ConnectorType ConnectorType `json:"connector_type"`
Version string `json:"version,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
SourceInfo contains information about the data source
type StreamMetrics ¶
type StreamMetrics struct {
EventsReceived int64 `json:"events_received"`
EventsProcessed int64 `json:"events_processed"`
EventsFiltered int64 `json:"events_filtered"`
EventsErrored int64 `json:"events_errored"`
EventsRetried int64 `json:"events_retried"`
BatchesProcessed int64 `json:"batches_processed"`
ProcessingLatency time.Duration `json:"processing_latency"`
ThroughputRPS float64 `json:"throughput_rps"`
BacklogSize int64 `json:"backlog_size"`
LastProcessedTime time.Time `json:"last_processed_time"`
// Per-handler metrics
HandlerMetrics map[string]HandlerMetrics `json:"handler_metrics"`
}
StreamMetrics contains metrics for stream processing
type StreamProcessor ¶
type StreamProcessor struct {
// contains filtered or unexported fields
}
StreamProcessor handles real-time processing of CDC events
func NewStreamProcessor ¶
func NewStreamProcessor(config StreamingConfig, logger *zap.Logger) *StreamProcessor
NewStreamProcessor creates a new stream processor
func (*StreamProcessor) AddFilter ¶
func (sp *StreamProcessor) AddFilter(filter EventFilter)
AddFilter adds an event filter
func (*StreamProcessor) GetDeadLetterQueueStats ¶
func (sp *StreamProcessor) GetDeadLetterQueueStats() DeadLetterStats
GetDeadLetterQueueStats returns dead letter queue statistics
func (*StreamProcessor) GetMetrics ¶
func (sp *StreamProcessor) GetMetrics() StreamMetrics
GetMetrics returns current stream processing metrics
func (*StreamProcessor) ProcessEvent ¶
func (sp *StreamProcessor) ProcessEvent(ctx context.Context, event ChangeEvent) error
ProcessEvent processes a single event
func (*StreamProcessor) ProcessEvents ¶
func (sp *StreamProcessor) ProcessEvents(ctx context.Context, events []ChangeEvent) error
ProcessEvents processes multiple events
func (*StreamProcessor) RegisterBatchHandler ¶
func (sp *StreamProcessor) RegisterBatchHandler(pattern string, handler BatchEventHandler)
RegisterBatchHandler registers a batch event handler
func (*StreamProcessor) RegisterHandler ¶
func (sp *StreamProcessor) RegisterHandler(pattern string, handler EventHandler)
RegisterHandler registers an event handler for a specific event type or table
func (*StreamProcessor) Start ¶
func (sp *StreamProcessor) Start(ctx context.Context) error
Start starts the stream processor
func (*StreamProcessor) Stop ¶
func (sp *StreamProcessor) Stop() error
Stop stops the stream processor gracefully
type StreamWorker ¶
type StreamWorker struct {
// contains filtered or unexported fields
}
StreamWorker processes events in parallel
type StreamingConfig ¶
type StreamingConfig struct {
MaxBatchSize int `json:"max_batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
MaxRetries int `json:"max_retries"`
RetryBackoff time.Duration `json:"retry_backoff"`
DeadLetterQueue string `json:"dead_letter_queue,omitempty"`
ParallelWorkers int `json:"parallel_workers"`
OrderingKey string `json:"ordering_key,omitempty"`
CompressionType string `json:"compression_type,omitempty"`
ExactlyOnce bool `json:"exactly_once"`
}
StreamingConfig contains configuration for event streaming
func (*StreamingConfig) Validate ¶
func (c *StreamingConfig) Validate() error
Validate validates the streaming configuration
type TableSchema ¶
type TableSchema struct {
Name string
Columns []ColumnInfo
PrimaryKey []string
LastUpdated time.Time
}
TableSchema represents PostgreSQL table schema information
type TransactionInfo ¶
type TransactionInfo struct {
ID string `json:"id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
EventCount int `json:"event_count"`
Size int64 `json:"size"`
}
TransactionInfo contains information about database transactions
type TruncatedArray ¶
TruncatedArray represents a truncated array field
type UpdateDescription ¶
type UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields,omitempty"`
RemovedFields []string `bson:"removedFields,omitempty"`
TruncatedArrays []TruncatedArray `bson:"truncatedArrays,omitempty"`
}
UpdateDescription contains information about updated fields