Documentation
¶
Index ¶
- type Checkpoint
- type ClusterConfig
- type FailoverConfig
- type FailoverEvent
- type FilterConfig
- type Manager
- type OffsetMapping
- type PartitionReplicationMetrics
- type ReplicationConfig
- type ReplicationHealth
- type ReplicationLink
- type ReplicationMetrics
- type ReplicationStatus
- type ReplicationType
- type SecurityConfig
- type Storage
- type StreamHandler
- type TopicReplicationConfig
- type TransformConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
// LinkID is the replication link ID
LinkID string
// Topic is the topic name
Topic string
// Partition is the partition ID
Partition int32
// SourceOffset is the offset in the source cluster
SourceOffset int64
// TargetOffset is the offset in the target cluster
TargetOffset int64
// Timestamp is when the checkpoint was created
Timestamp time.Time
// Metadata contains additional checkpoint metadata
Metadata map[string]string
}
Checkpoint represents a replication checkpoint
type ClusterConfig ¶
type ClusterConfig struct {
// ClusterID is the unique identifier for the cluster
ClusterID string
// Brokers is the list of broker addresses (host:port)
Brokers []string
// BootstrapServers is a comma-separated list of broker addresses
BootstrapServers string
// Security contains authentication and encryption settings
Security *SecurityConfig
// ConnectionTimeout is the timeout for establishing connections
ConnectionTimeout time.Duration
// RequestTimeout is the timeout for individual requests
RequestTimeout time.Duration
// RetryBackoff is the backoff duration between retries
RetryBackoff time.Duration
// MaxRetries is the maximum number of retries
MaxRetries int
}
ClusterConfig represents connection details for a cluster
func DefaultClusterConfig ¶
func DefaultClusterConfig() ClusterConfig
DefaultClusterConfig returns default cluster configuration
func (*ClusterConfig) Clone ¶
func (cc *ClusterConfig) Clone() ClusterConfig
Clone creates a deep copy of the cluster configuration
func (*ClusterConfig) Validate ¶
func (cc *ClusterConfig) Validate() error
Validate validates the cluster configuration
type FailoverConfig ¶
type FailoverConfig struct {
// Enabled indicates if automatic failover is enabled
Enabled bool
// FailoverThreshold is the lag threshold that triggers failover
FailoverThreshold int64
// FailoverTimeoutMs is the timeout before declaring failure
FailoverTimeoutMs int64
// MaxConsecutiveFailures is the max failures before failover
MaxConsecutiveFailures int
// AutoFailback enables automatic failback when primary recovers
AutoFailback bool
// FailbackDelayMs is the delay before attempting failback
FailbackDelayMs int64
// NotificationWebhook is a webhook URL for failover notifications
NotificationWebhook string
// NotificationEmail is an email address for failover notifications
NotificationEmail string
}
FailoverConfig contains automatic failover settings
func DefaultFailoverConfig ¶
func DefaultFailoverConfig() *FailoverConfig
DefaultFailoverConfig returns default failover configuration
type FailoverEvent ¶
type FailoverEvent struct {
// ID is the unique identifier for this event
ID string
// LinkID is the replication link ID
LinkID string
// Type is the event type (failover, failback, manual-switch)
Type string
// Reason is why the failover occurred
Reason string
// SourceClusterID is the cluster being failed over from
SourceClusterID string
// TargetClusterID is the cluster being failed over to
TargetClusterID string
// Timestamp is when the event occurred
Timestamp time.Time
// Success indicates if the failover was successful
Success bool
// ErrorMessage contains error details if failed
ErrorMessage string
// OffsetMappings contains offset mappings at time of failover
OffsetMappings map[string]*OffsetMapping
// Duration is how long the failover took
Duration time.Duration
}
FailoverEvent represents a failover event
type FilterConfig ¶
type FilterConfig struct {
// Enabled indicates if filtering is enabled
Enabled bool
// IncludePatterns are regex patterns for messages to include
IncludePatterns []string
// ExcludePatterns are regex patterns for messages to exclude
ExcludePatterns []string
// FilterByHeader filters messages based on header values
FilterByHeader map[string]string
// MinTimestamp filters messages older than this timestamp
MinTimestamp time.Time
// MaxTimestamp filters messages newer than this timestamp
MaxTimestamp time.Time
}
FilterConfig defines message filtering rules
type Manager ¶
type Manager interface {
// CreateLink creates a new replication link
CreateLink(config *ReplicationLink) error
// DeleteLink deletes a replication link
DeleteLink(linkID string) error
// UpdateLink updates a replication link configuration
UpdateLink(linkID string, config *ReplicationLink) error
// GetLink retrieves a replication link
GetLink(linkID string) (*ReplicationLink, error)
// ListLinks lists all replication links
ListLinks() ([]*ReplicationLink, error)
// StartLink starts a replication link
StartLink(linkID string) error
// StopLink stops a replication link
StopLink(linkID string) error
// PauseLink pauses a replication link
PauseLink(linkID string) error
// ResumeLink resumes a paused replication link
ResumeLink(linkID string) error
// GetMetrics retrieves metrics for a replication link
GetMetrics(linkID string) (*ReplicationMetrics, error)
// GetHealth retrieves health status for a replication link
GetHealth(linkID string) (*ReplicationHealth, error)
// Failover triggers a manual failover
Failover(linkID string) (*FailoverEvent, error)
// Failback triggers a manual failback
Failback(linkID string) (*FailoverEvent, error)
// GetCheckpoint retrieves the checkpoint for a topic-partition
GetCheckpoint(linkID, topic string, partition int32) (*Checkpoint, error)
// SetCheckpoint sets the checkpoint for a topic-partition
SetCheckpoint(checkpoint *Checkpoint) error
// Close closes the replication manager
Close() error
}
Manager manages all cross-datacenter replication links
func NewManager ¶
NewManager creates a new replication link manager
type OffsetMapping ¶
type OffsetMapping struct {
// LinkID is the replication link ID
LinkID string
// Topic is the topic name
Topic string
// Partition is the partition ID
Partition int32
// Mappings maps source offsets to target offsets
Mappings map[int64]int64
// LastUpdated is when the mapping was last updated
LastUpdated time.Time
}
OffsetMapping maps offsets between source and target clusters
type PartitionReplicationMetrics ¶
type PartitionReplicationMetrics struct {
// Topic is the topic name
Topic string
// Partition is the partition ID
Partition int32
// SourceOffset is the current offset in source cluster
SourceOffset int64
// TargetOffset is the current offset in target cluster
TargetOffset int64
// Lag is the replication lag for this partition
Lag int64
// MessagesReplicated is the total messages replicated
MessagesReplicated int64
// BytesReplicated is the total bytes replicated
BytesReplicated int64
// LastReplicatedAt is when the last message was replicated
LastReplicatedAt time.Time
// Errors is the number of errors for this partition
Errors int64
}
PartitionReplicationMetrics contains per-partition replication metrics
type ReplicationConfig ¶
type ReplicationConfig struct {
// MaxBytes is the maximum bytes to fetch per request
MaxBytes int64
// MaxMessages is the maximum messages to fetch per request
MaxMessages int
// FetchWaitMaxMs is the max time to wait for fetch
FetchWaitMaxMs int
// MinBytes is the minimum bytes to accumulate before fetch
MinBytes int
// BatchSize is the batch size for replication
BatchSize int
// BufferSize is the internal buffer size
BufferSize int
// ConcurrentPartitions is the number of partitions to replicate concurrently
ConcurrentPartitions int
// EnableCompression enables compression for replication traffic
EnableCompression bool
// CompressionType is the compression algorithm (gzip, snappy, lz4, zstd)
CompressionType string
// ThrottleRateBytesPerSec limits replication throughput (0 = unlimited)
ThrottleRateBytesPerSec int64
// CheckpointIntervalMs is how often to checkpoint offsets
CheckpointIntervalMs int64
// SyncIntervalMs is how often to sync metadata
SyncIntervalMs int64
// HeartbeatIntervalMs is how often to send heartbeats
HeartbeatIntervalMs int64
// EnableExactlyOnce enables exactly-once semantics
EnableExactlyOnce bool
// EnableIdempotence enables idempotent producer
EnableIdempotence bool
}
ReplicationConfig contains performance tuning parameters
func DefaultReplicationConfig ¶
func DefaultReplicationConfig() ReplicationConfig
DefaultReplicationConfig returns default replication configuration
func (*ReplicationConfig) Validate ¶
func (rc *ReplicationConfig) Validate() error
Validate validates the replication configuration
type ReplicationHealth ¶
type ReplicationHealth struct {
// Status is the overall health status
Status string // "healthy", "degraded", "unhealthy"
// LastHealthCheck is when health was last checked
LastHealthCheck time.Time
// SourceClusterReachable indicates if source cluster is reachable
SourceClusterReachable bool
// TargetClusterReachable indicates if target cluster is reachable
TargetClusterReachable bool
// ReplicationLagHealthy indicates if lag is within acceptable range
ReplicationLagHealthy bool
// ErrorRateHealthy indicates if error rate is acceptable
ErrorRateHealthy bool
// CheckpointHealthy indicates if checkpointing is working
CheckpointHealthy bool
// Issues contains descriptions of any health issues
Issues []string
// Warnings contains non-critical warnings
Warnings []string
}
ReplicationHealth represents health status of a replication link
type ReplicationLink ¶
type ReplicationLink struct {
// ID is the unique identifier for this replication link
ID string
// Name is a human-readable name for this link
Name string
// Type defines the replication topology
Type ReplicationType
// SourceCluster is the source cluster configuration
SourceCluster ClusterConfig
// TargetCluster is the target cluster configuration
TargetCluster ClusterConfig
// Topics is the list of topics to replicate (empty = all topics)
Topics []string
// TopicPrefix is the prefix to add to replicated topic names (optional)
TopicPrefix string
// TopicConfig contains per-topic replication settings
TopicConfig map[string]*TopicReplicationConfig
// Filter defines message filtering rules
Filter *FilterConfig
// Transform defines message transformation rules
Transform *TransformConfig
// Config contains replication performance tuning
Config ReplicationConfig
// Status is the current status of the replication link
Status ReplicationStatus
// CreatedAt is when the link was created
CreatedAt time.Time
// UpdatedAt is when the link was last updated
UpdatedAt time.Time
// StartedAt is when replication started
StartedAt time.Time
// StoppedAt is when replication stopped
StoppedAt time.Time
// Metrics contains replication metrics
Metrics *ReplicationMetrics
// Health contains health status information
Health *ReplicationHealth
// FailoverConfig contains automatic failover settings
FailoverConfig *FailoverConfig
}
ReplicationLink represents a replication connection between two clusters
func (*ReplicationLink) Clone ¶
func (rl *ReplicationLink) Clone() *ReplicationLink
Clone creates a deep copy of the replication link
func (*ReplicationLink) Validate ¶
func (rl *ReplicationLink) Validate() error
Validate validates the replication link configuration
type ReplicationMetrics ¶
type ReplicationMetrics struct {
// TotalMessagesReplicated is the total number of messages replicated
TotalMessagesReplicated int64
// TotalBytesReplicated is the total bytes replicated
TotalBytesReplicated int64
// MessagesPerSecond is the current replication rate
MessagesPerSecond float64
// BytesPerSecond is the current byte rate
BytesPerSecond float64
// ReplicationLag is the lag in milliseconds
ReplicationLag int64
// AverageReplicationLag is the average lag over time
AverageReplicationLag int64
// MaxReplicationLag is the maximum observed lag
MaxReplicationLag int64
// TotalErrors is the total number of errors
TotalErrors int64
// ErrorsPerSecond is the current error rate
ErrorsPerSecond float64
// LastCheckpoint is the timestamp of the last checkpoint
LastCheckpoint time.Time
// LastSuccessfulReplication is the timestamp of last successful replication
LastSuccessfulReplication time.Time
// PartitionMetrics contains per-partition metrics
PartitionMetrics map[string]*PartitionReplicationMetrics
// ConsecutiveFailures is the number of consecutive failures
ConsecutiveFailures int
// UptimeSeconds is the total uptime in seconds
UptimeSeconds int64
}
ReplicationMetrics contains replication metrics and statistics
type ReplicationStatus ¶
type ReplicationStatus string
ReplicationStatus represents the current state of a replication link
const ( // ReplicationStatusActive indicates replication is running normally ReplicationStatusActive ReplicationStatus = "active" // ReplicationStatusPaused indicates replication is temporarily paused ReplicationStatusPaused ReplicationStatus = "paused" // ReplicationStatusFailed indicates replication has encountered errors ReplicationStatusFailed ReplicationStatus = "failed" // ReplicationStatusStopped indicates replication has been stopped ReplicationStatusStopped ReplicationStatus = "stopped" // ReplicationStatusInitializing indicates initial sync is in progress ReplicationStatusInitializing ReplicationStatus = "initializing" )
type ReplicationType ¶
type ReplicationType string
ReplicationType defines the type of cross-datacenter replication topology
const ( // ReplicationTypeActivePassive is unidirectional replication from source to target ReplicationTypeActivePassive ReplicationType = "active-passive" // ReplicationTypeActiveActive is bidirectional replication between clusters ReplicationTypeActiveActive ReplicationType = "active-active" // ReplicationTypeStar is hub-and-spoke replication with one central cluster ReplicationTypeStar ReplicationType = "star" )
type SecurityConfig ¶
type SecurityConfig struct {
// EnableTLS enables TLS encryption
EnableTLS bool
// TLSCertFile is the path to the TLS certificate file
TLSCertFile string
// TLSKeyFile is the path to the TLS key file
TLSKeyFile string
// TLSCAFile is the path to the CA certificate file
TLSCAFile string
// TLSSkipVerify skips TLS certificate verification (insecure)
TLSSkipVerify bool
// SASLMechanism is the SASL authentication mechanism (PLAIN, SCRAM-SHA-256, etc.)
SASLMechanism string
// SASLUsername is the SASL username
SASLUsername string
// SASLPassword is the SASL password
SASLPassword string
}
SecurityConfig contains security settings for cluster connections
func (*SecurityConfig) Validate ¶
func (sc *SecurityConfig) Validate() error
Validate validates the security configuration
type Storage ¶
type Storage interface {
// SaveLink saves a replication link
SaveLink(link *ReplicationLink) error
// LoadLink loads a replication link
LoadLink(linkID string) (*ReplicationLink, error)
// DeleteLink deletes a replication link
DeleteLink(linkID string) error
// ListLinks lists all replication links
ListLinks() ([]*ReplicationLink, error)
// SaveCheckpoint saves a checkpoint
SaveCheckpoint(checkpoint *Checkpoint) error
// LoadCheckpoint loads a checkpoint
LoadCheckpoint(linkID, topic string, partition int32) (*Checkpoint, error)
// SaveOffsetMapping saves an offset mapping
SaveOffsetMapping(mapping *OffsetMapping) error
// LoadOffsetMapping loads an offset mapping
LoadOffsetMapping(linkID, topic string, partition int32) (*OffsetMapping, error)
}
Storage interface for persisting replication state
func NewMemoryStorage ¶
func NewMemoryStorage() Storage
NewMemoryStorage creates a new in-memory storage
type StreamHandler ¶
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler handles the replication stream for a single link
func NewStreamHandler ¶
func NewStreamHandler(link *ReplicationLink, checkpointStore Storage) (*StreamHandler, error)
NewStreamHandler creates a new stream handler for a replication link
func (*StreamHandler) Start ¶
func (h *StreamHandler) Start() error
Start starts the replication stream
type TopicReplicationConfig ¶
type TopicReplicationConfig struct {
// SourceTopic is the name of the topic in the source cluster
SourceTopic string
// TargetTopic is the name of the topic in the target cluster
TargetTopic string
// Enabled indicates if replication is enabled for this topic
Enabled bool
// ReplicateDeletes determines if delete markers are replicated
ReplicateDeletes bool
// PreservePartitionCount maintains the same partition count
PreservePartitionCount bool
// PreserveTimestamps maintains original message timestamps
PreserveTimestamps bool
// CompressionType specifies compression for replicated messages
CompressionType string
// Priority sets the replication priority (higher = more resources)
Priority int
}
TopicReplicationConfig contains per-topic replication settings
type TransformConfig ¶
type TransformConfig struct {
// Enabled indicates if transformation is enabled
Enabled bool
// HeaderTransforms are header transformation rules
HeaderTransforms map[string]string
// KeyTransform is a transformation expression for message keys
KeyTransform string
// ValueTransform is a transformation expression for message values
ValueTransform string
// AddHeaders adds static headers to all messages
AddHeaders map[string]string
// RemoveHeaders removes headers from messages
RemoveHeaders []string
}
TransformConfig defines message transformation rules