Versions in this module Expand all Collapse all v1 v1.1.0 Mar 15, 2026 v1.0.1 Nov 22, 2025 Changes in this version + type Config struct + FetchBackoffMs int64 + FetchMaxBytes int + FetchMaxWaitMs int + FetchMinBytes int + FetcherInterval time.Duration + HighWaterMarkCheckpointIntervalMs int64 + MaxInflightFetches int + NumFetcherThreads int + ReplicaIDForFollower ReplicaID + ReplicaLagMaxMessages int64 + ReplicaLagTimeMaxMs int64 + func DefaultConfig() *Config + func (c *Config) Clone() *Config + func (c *Config) Validate() error + type ErrorCode int + const ErrorNone + const ErrorNotLeader + const ErrorOffsetOutOfRange + const ErrorPartitionNotFound + const ErrorReplicaNotInReplicas + const ErrorStaleEpoch + const ErrorUnknown + func (e ErrorCode) String() string + type FailoverCoordinator struct + func NewFailoverCoordinator(config *Config, brokerID ReplicaID, metadataClient MetadataClient, ...) *FailoverCoordinator + func (fc *FailoverCoordinator) GetMetrics() FailoverMetrics + func (fc *FailoverCoordinator) RegisterPartition(topic string, partitionID int) error + func (fc *FailoverCoordinator) Start() error + func (fc *FailoverCoordinator) Stop() error + type FailoverMetrics struct + AverageFailoverTime time.Duration + FailedFailovers int64 + LastFailoverTime time.Time + SuccessfulFailovers int64 + TotalFailovers int64 + type FailureDetector struct + func (fd *FailureDetector) HasFailed(brokerID ReplicaID) bool + func (fd *FailureDetector) RecordHeartbeat(brokerID ReplicaID) + type FetchRequest struct + FetchOffset Offset + LeaderEpoch int64 + MaxBytes int + MaxWaitMs int + MinBytes int + PartitionID int + ReplicaID ReplicaID + Topic string + type FetchResponse struct + Error error + ErrorCode ErrorCode + HighWaterMark Offset + LeaderEpoch int64 + LogEndOffset Offset + Messages []*Message + PartitionID int + Topic string + type Fetcher struct + func NewFetcher(config *Config, replicaID ReplicaID, storage StorageWriter, ...) *Fetcher + func (f *Fetcher) AddPartition(topic string, partitionID int, leaderID ReplicaID, leaderEpoch int64) error + func (f *Fetcher) GetAllMetrics() map[string]ReplicationMetrics + func (f *Fetcher) GetPartitionCount() int + func (f *Fetcher) GetReplicationLag(topic string, partitionID int) (int64, error) + func (f *Fetcher) RemovePartition(topic string, partitionID int) error + func (f *Fetcher) Start() error + func (f *Fetcher) Stop() error + func (f *Fetcher) UpdateLeader(topic string, partitionID int, leaderID ReplicaID, leaderEpoch int64) error + type FollowerReplicator struct + func NewFollowerReplicator(config *Config, topic string, partitionID int, replicaID ReplicaID, ...) *FollowerReplicator + func (fr *FollowerReplicator) GetFetchLag() int64 + func (fr *FollowerReplicator) GetHighWaterMark() Offset + func (fr *FollowerReplicator) GetLogEndOffset() Offset + func (fr *FollowerReplicator) GetMetrics() ReplicationMetrics + func (fr *FollowerReplicator) Start() error + func (fr *FollowerReplicator) Stop() error + func (fr *FollowerReplicator) UpdateLeader(leaderID ReplicaID, leaderEpoch int64) error + type GlobalMetricsSnapshot struct + AvgFailoverTimeMs int64 + AvgReplicationLagMessages int64 + FailedFailovers int64 + FollowerPartitions int32 + LeaderPartitions int32 + MaxReplicationLagMessages int64 + PartitionsUnderReplicated int32 + TotalFailovers int64 + TotalFetchBytes int64 + TotalFetchErrors int64 + TotalFetchRequests int64 + TotalISRExpands int64 + TotalISRShrinks int64 + TotalPartitions int32 + type GlobalReplicationMetrics struct + AvgFailoverTimeMs atomic.Int64 + AvgReplicationLagMessages atomic.Int64 + FailedFailovers atomic.Int64 + FollowerPartitions atomic.Int32 + LeaderPartitions atomic.Int32 + MaxReplicationLagMessages atomic.Int64 + PartitionsUnderReplicated atomic.Int32 + TotalFailovers atomic.Int64 + TotalFetchBytes atomic.Int64 + TotalFetchErrors atomic.Int64 + TotalFetchRequests atomic.Int64 + TotalISRExpands atomic.Int64 + TotalISRShrinks atomic.Int64 + TotalPartitions atomic.Int32 + type HeartbeatClient interface + SendHeartbeat func(ctx context.Context, brokerID ReplicaID) error + type ISRChangeNotification struct + NewISR []ReplicaID + OldISR []ReplicaID + PartitionID int + Reason string + Timestamp time.Time + Topic string + type LeaderClient interface + Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) + GetLeaderEndpoint func(topic string, partitionID int) (string, error) + type LeaderReplicator struct + func NewLeaderReplicator(config *Config, topic string, partitionID int, leaderEpoch int64, ...) *LeaderReplicator + func (lr *LeaderReplicator) GetHighWaterMark() Offset + func (lr *LeaderReplicator) GetISR() []ReplicaID + func (lr *LeaderReplicator) GetMetrics() ReplicationMetrics + func (lr *LeaderReplicator) GetPartitionState() *PartitionReplicationState + func (lr *LeaderReplicator) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) + func (lr *LeaderReplicator) ISRChangeChannel() <-chan *ISRChangeNotification + func (lr *LeaderReplicator) Start() error + func (lr *LeaderReplicator) Stop() error + func (lr *LeaderReplicator) UpdateLogEndOffset(newLEO Offset) + type LogReconciliation struct + func NewLogReconciliation(topic string, partitionID int, replicaID ReplicaID, leaderEpoch int64, ...) *LogReconciliation + func (lr *LogReconciliation) GetDivergencePoint(ctx context.Context, localLEO Offset, leaderLEO Offset) (Offset, error) + func (lr *LogReconciliation) ReconcileAsFollower(ctx context.Context, leaderEpoch int64) error + func (lr *LogReconciliation) ReconcileAsNewLeader(ctx context.Context) error + func (lr *LogReconciliation) ResetToOffset(ctx context.Context, offset Offset) error + func (lr *LogReconciliation) ValidateConsistency(ctx context.Context) error + type Message struct + Headers map[string][]byte + Key []byte + Offset Offset + Timestamp int64 + Value []byte + type MetadataClient interface + GetPartitionISR func(topic string, partitionID int) ([]ReplicaID, error) + GetPartitionLeader func(topic string, partitionID int) (ReplicaID, int64, error) + GetPartitionReplicas func(topic string, partitionID int) ([]ReplicaID, error) + UpdatePartitionISR func(ctx context.Context, topic string, partitionID int, newISR []ReplicaID) error + UpdatePartitionLeader func(ctx context.Context, topic string, partitionID int, newLeader ReplicaID, ...) error + type MetricsCollector struct + func NewMetricsCollector() *MetricsCollector + func (mc *MetricsCollector) ComputeGlobalMetrics() + func (mc *MetricsCollector) GetAllPartitionMetrics() map[string]*PartitionMetrics + func (mc *MetricsCollector) GetGlobalMetrics() *GlobalReplicationMetrics + func (mc *MetricsCollector) GetGlobalSnapshot() GlobalMetricsSnapshot + func (mc *MetricsCollector) GetPartitionMetrics(topic string, partitionID int) *PartitionMetrics + func (mc *MetricsCollector) RegisterPartition(topic string, partitionID int) *PartitionMetrics + func (mc *MetricsCollector) UnregisterPartition(topic string, partitionID int) + type Offset int64 + type PartitionFailoverState struct + type PartitionMetrics struct + FailoverCount atomic.Int64 + FetchBytesTotal atomic.Int64 + FetchErrorCount atomic.Int64 + FetchRequestCount atomic.Int64 + HighWaterMark atomic.Int64 + ISRExpandCount atomic.Int64 + ISRShrinkCount atomic.Int64 + ISRSize atomic.Int32 + LastFailoverTime atomic.Int64 + LastFetchLatencyMs atomic.Int64 + LastFetchTime atomic.Int64 + LastISRChange atomic.Int64 + LeaderEpoch atomic.Int64 + LogEndOffset atomic.Int64 + PartitionID int + ReplicationLagMessages atomic.Int64 + ReplicationLagMs atomic.Int64 + Topic string + func (pm *PartitionMetrics) GetSnapshot() PartitionMetricsSnapshot + func (pm *PartitionMetrics) RecordFailover(newEpoch int64) + func (pm *PartitionMetrics) RecordFetch(bytesRead int, latencyMs int64, err error) + func (pm *PartitionMetrics) RecordISRChange(newSize int, isShrink bool) + func (pm *PartitionMetrics) UpdateFromReplicationMetrics(rm ReplicationMetrics) + func (pm *PartitionMetrics) UpdateOffsets(leo, hw Offset) + type PartitionMetricsSnapshot struct + FailoverCount int64 + FetchBytesTotal int64 + FetchErrorCount int64 + FetchRequestCount int64 + HighWaterMark int64 + ISRExpandCount int64 + ISRShrinkCount int64 + ISRSize int32 + LastFailoverTime time.Time + LastFetchLatencyMs int64 + LastFetchTime time.Time + LastISRChange time.Time + LeaderEpoch int64 + LogEndOffset int64 + PartitionID int + ReplicationLagMessages int64 + ReplicationLagMs int64 + Topic string + type PartitionReplicationState struct + HighWaterMark Offset + ISR []ReplicaID + Leader ReplicaID + LeaderEpoch int64 + LogEndOffset Offset + PartitionID int + ReplicaStates map[ReplicaID]*ReplicationState + Replicas []ReplicaID + Topic string + type PartitionRole int + const RoleFollower + const RoleLeader + type ReplicaID uint64 + type ReplicationManager struct + func NewReplicationManager(config *Config, topic string, partitionID int, replicaID ReplicaID, ...) *ReplicationManager + func (rm *ReplicationManager) BecomeFollower(leaderID ReplicaID, leaderEpoch int64) error + func (rm *ReplicationManager) BecomeLeader(leaderEpoch int64, replicas []ReplicaID) error + func (rm *ReplicationManager) GetHighWaterMark() (Offset, error) + func (rm *ReplicationManager) GetISR() ([]ReplicaID, error) + func (rm *ReplicationManager) GetMetrics() ReplicationMetrics + func (rm *ReplicationManager) HandleFetchRequest(req *FetchRequest) (*FetchResponse, error) + func (rm *ReplicationManager) IsLeader() bool + func (rm *ReplicationManager) NotifyReplication() + func (rm *ReplicationManager) Start() error + func (rm *ReplicationManager) Stop() error + func (rm *ReplicationManager) UpdateLogEndOffset(newLEO Offset) error + func (rm *ReplicationManager) WaitForISR(ctx context.Context, offset Offset, timeoutMs int32) error + type ReplicationMetrics struct + FetchBytesRate float64 + FetchRequestRate float64 + ISRExpandRate float64 + ISRShrinkRate float64 + LastFetchLatencyMs int64 + ReplicationLagMessages int64 + ReplicationLagMs int64 + type ReplicationState struct + FetchLag int64 + HighWaterMark Offset + InSync bool + LastCaughtUpTime time.Time + LastFetchTime time.Time + LogEndOffset Offset + ReplicaID ReplicaID + type StorageAdapter interface + type StorageMessage struct + Headers map[string][]byte + Key []byte + Offset Offset + Timestamp int64 + Value []byte + type StorageReader interface + GetHighWaterMark func(partition int) (Offset, error) + GetLogEndOffset func(partition int) (Offset, error) + ReadRange func(ctx context.Context, partition int, startOffset, endOffset Offset) ([]*StorageMessage, error) + type StorageWriter interface + AppendMessages func(ctx context.Context, partition int, messages []*StorageMessage) error + GetLogEndOffset func(partition int) (Offset, error) + SetHighWaterMark func(partition int, hw Offset) error + Truncate func(partition int, offset Offset) error