Versions in this module Expand all Collapse all v1 v1.1.0 Mar 15, 2026 v1.0.1 Nov 22, 2025 Changes in this version + type Checkpoint struct + LinkID string + Metadata map[string]string + Partition int32 + SourceOffset int64 + TargetOffset int64 + Timestamp time.Time + Topic string + type ClusterConfig struct + BootstrapServers string + Brokers []string + ClusterID string + ConnectionTimeout time.Duration + MaxRetries int + RequestTimeout time.Duration + RetryBackoff time.Duration + Security *SecurityConfig + func DefaultClusterConfig() ClusterConfig + func (cc *ClusterConfig) Clone() ClusterConfig + func (cc *ClusterConfig) Validate() error + type FailoverConfig struct + AutoFailback bool + Enabled bool + FailbackDelayMs int64 + FailoverThreshold int64 + FailoverTimeoutMs int64 + MaxConsecutiveFailures int + NotificationEmail string + NotificationWebhook string + func DefaultFailoverConfig() *FailoverConfig + type FailoverEvent struct + Duration time.Duration + ErrorMessage string + ID string + LinkID string + OffsetMappings map[string]*OffsetMapping + Reason string + SourceClusterID string + Success bool + TargetClusterID string + Timestamp time.Time + Type string + type FilterConfig struct + Enabled bool + ExcludePatterns []string + FilterByHeader map[string]string + IncludePatterns []string + MaxTimestamp time.Time + MinTimestamp time.Time + type Manager interface + Close func() error + CreateLink func(config *ReplicationLink) error + DeleteLink func(linkID string) error + Failback func(linkID string) (*FailoverEvent, error) + Failover func(linkID string) (*FailoverEvent, error) + GetCheckpoint func(linkID, topic string, partition int32) (*Checkpoint, error) + GetHealth func(linkID string) (*ReplicationHealth, error) + GetLink func(linkID string) (*ReplicationLink, error) + GetMetrics func(linkID string) (*ReplicationMetrics, error) + ListLinks func() ([]*ReplicationLink, error) + PauseLink func(linkID string) error + ResumeLink func(linkID string) error + SetCheckpoint func(checkpoint *Checkpoint) error + StartLink func(linkID string) error + StopLink func(linkID string) error + UpdateLink func(linkID string, config *ReplicationLink) error + func NewManager(storage Storage) Manager + type OffsetMapping struct + LastUpdated time.Time + LinkID string + Mappings map[int64]int64 + Partition int32 + Topic string + type PartitionReplicationMetrics struct + BytesReplicated int64 + Errors int64 + Lag int64 + LastReplicatedAt time.Time + MessagesReplicated int64 + Partition int32 + SourceOffset int64 + TargetOffset int64 + Topic string + type ReplicationConfig struct + BatchSize int + BufferSize int + CheckpointIntervalMs int64 + CompressionType string + ConcurrentPartitions int + EnableCompression bool + EnableExactlyOnce bool + EnableIdempotence bool + FetchWaitMaxMs int + HeartbeatIntervalMs int64 + MaxBytes int64 + MaxMessages int + MinBytes int + SyncIntervalMs int64 + ThrottleRateBytesPerSec int64 + func DefaultReplicationConfig() ReplicationConfig + func (rc *ReplicationConfig) Validate() error + type ReplicationHealth struct + CheckpointHealthy bool + ErrorRateHealthy bool + Issues []string + LastHealthCheck time.Time + ReplicationLagHealthy bool + SourceClusterReachable bool + Status string + TargetClusterReachable bool + Warnings []string + type ReplicationLink struct + Config ReplicationConfig + CreatedAt time.Time + FailoverConfig *FailoverConfig + Filter *FilterConfig + Health *ReplicationHealth + ID string + Metrics *ReplicationMetrics + Name string + SourceCluster ClusterConfig + StartedAt time.Time + Status ReplicationStatus + StoppedAt time.Time + TargetCluster ClusterConfig + TopicConfig map[string]*TopicReplicationConfig + TopicPrefix string + Topics []string + Transform *TransformConfig + Type ReplicationType + UpdatedAt time.Time + func (rl *ReplicationLink) Clone() *ReplicationLink + func (rl *ReplicationLink) Validate() error + type ReplicationMetrics struct + AverageReplicationLag int64 + BytesPerSecond float64 + ConsecutiveFailures int + ErrorsPerSecond float64 + LastCheckpoint time.Time + LastSuccessfulReplication time.Time + MaxReplicationLag int64 + MessagesPerSecond float64 + PartitionMetrics map[string]*PartitionReplicationMetrics + ReplicationLag int64 + TotalBytesReplicated int64 + TotalErrors int64 + TotalMessagesReplicated int64 + UptimeSeconds int64 + type ReplicationStatus string + const ReplicationStatusActive + const ReplicationStatusFailed + const ReplicationStatusInitializing + const ReplicationStatusPaused + const ReplicationStatusStopped + type ReplicationType string + const ReplicationTypeActiveActive + const ReplicationTypeActivePassive + const ReplicationTypeStar + type SecurityConfig struct + EnableTLS bool + SASLMechanism string + SASLPassword string + SASLUsername string + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSSkipVerify bool + func (sc *SecurityConfig) Validate() error + type Storage interface + DeleteLink func(linkID string) error + ListLinks func() ([]*ReplicationLink, error) + LoadCheckpoint func(linkID, topic string, partition int32) (*Checkpoint, error) + LoadLink func(linkID string) (*ReplicationLink, error) + LoadOffsetMapping func(linkID, topic string, partition int32) (*OffsetMapping, error) + SaveCheckpoint func(checkpoint *Checkpoint) error + SaveLink func(link *ReplicationLink) error + SaveOffsetMapping func(mapping *OffsetMapping) error + func NewMemoryStorage() Storage + type StreamHandler struct + func NewStreamHandler(link *ReplicationLink, checkpointStore Storage) (*StreamHandler, error) + func (h *StreamHandler) Start() error + func (h *StreamHandler) Stop() error + type TopicReplicationConfig struct + CompressionType string + Enabled bool + PreservePartitionCount bool + PreserveTimestamps bool + Priority int + ReplicateDeletes bool + SourceTopic string + TargetTopic string + type TransformConfig struct + AddHeaders map[string]string + Enabled bool + HeaderTransforms map[string]string + KeyTransform string + RemoveHeaders []string + ValueTransform string