Documentation
¶
Index ¶
- Constants
- Variables
- func DeriveNodeID(id string) uint64
- func JoinRoleViolationCount() uint64
- type Engine
- func (e *Engine) AddLearner(ctx context.Context, id string, address string, prevIndex uint64) (uint64, error)
- func (e *Engine) AddVoter(ctx context.Context, id string, address string, prevIndex uint64) (uint64, error)
- func (e *Engine) AppliedIndex() uint64
- func (e *Engine) CheckServing(ctx context.Context) error
- func (e *Engine) Close() error
- func (e *Engine) Configuration(ctx context.Context) (raftengine.Configuration, error)
- func (e *Engine) DispatchDropCount() uint64
- func (e *Engine) DispatchErrorCount() uint64
- func (e *Engine) DispatchErrorCountsByCode() map[string]uint64
- func (e *Engine) EncryptionScanner() encryption.EncryptionRelevantScanner
- func (e *Engine) LastQuorumAck() monoclock.Instant
- func (e *Engine) Leader() raftengine.LeaderInfo
- func (e *Engine) LeaseDuration() time.Duration
- func (e *Engine) LinearizableRead(ctx context.Context) (uint64, error)
- func (e *Engine) PromoteLearner(ctx context.Context, id string, prevIndex uint64, minAppliedIndex uint64, ...) (uint64, error)
- func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.ProposalResult, error)
- func (e *Engine) ProposeAdmin(ctx context.Context, data []byte) (*raftengine.ProposalResult, error)
- func (e *Engine) RegisterLeaderAcquiredCallback(fn func()) (deregister func())
- func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func())
- func (e *Engine) RemoveServer(ctx context.Context, id string, prevIndex uint64) (uint64, error)
- func (e *Engine) State() raftengine.State
- func (e *Engine) Status() raftengine.Status
- func (e *Engine) StepQueueFullCount() uint64
- func (e *Engine) TransferLeadership(ctx context.Context) error
- func (e *Engine) TransferLeadershipToServer(ctx context.Context, id string, address string) error
- func (e *Engine) VerifyLeader(ctx context.Context) error
- type Factory
- type FactoryConfig
- type GRPCTransport
- func (t *GRPCTransport) Close() error
- func (t *GRPCTransport) Dispatch(ctx context.Context, msg raftpb.Message) error
- func (t *GRPCTransport) DispatchSnapshotSpool(ctx context.Context, msg raftpb.Message, spool *snapshotSpool) error
- func (t *GRPCTransport) Register(server grpc.ServiceRegistrar)
- func (t *GRPCTransport) RemovePeer(nodeID uint64)
- func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error)
- func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) error
- func (t *GRPCTransport) SetFSMPayloadOpener(fn func(index uint64) (io.ReadCloser, error))
- func (t *GRPCTransport) SetFSMPayloadReader(fn func(index uint64) ([]byte, error))
- func (t *GRPCTransport) SetFSMSnapDir(dir string)
- func (t *GRPCTransport) SetHandler(handler MessageHandler)
- func (t *GRPCTransport) SetSpoolDir(dir string)
- func (t *GRPCTransport) UpsertPeer(peer Peer)
- type MessageHandler
- type MigrationStats
- type OpenConfig
- type Peer
- type RaftCutoverIndex
- type Snapshot
- type StateMachine
Constants ¶
const ( SuffrageVoter = "voter" SuffrageLearner = "learner" )
Suffrage values match the strings emitted by configurationFromConfState and surfaced by raftengine.Server.Suffrage / pb.RaftAdminMember.suffrage.
Variables ¶
var ( // ErrFSMSnapshotFileCRC is returned when the on-disk CRC32C footer does not // match the computed checksum — the .fsm file is corrupt. ErrFSMSnapshotFileCRC = errors.New("fsm snapshot: file CRC32C mismatch (file corrupt)") // ErrFSMSnapshotTokenCRC is returned when the footer and the token CRC // disagree before restore — the metadata is suspect; do not auto-rewrite. ErrFSMSnapshotTokenCRC = errors.New("fsm snapshot: token CRC32C mismatch (metadata corrupt)") // ErrFSMSnapshotNotFound is returned when the expected .fsm file is absent. ErrFSMSnapshotNotFound = errors.New("fsm snapshot: file not found") // ErrFSMSnapshotTooSmall is returned when the file is shorter than the // minimum valid .fsm size (0 or more bytes payload + 4 bytes CRC footer). ErrFSMSnapshotTooSmall = errors.New("fsm snapshot: file too small to contain footer") // ErrFSMSnapshotTokenInvalid is returned when the token bytes cannot be // decoded (wrong length or magic prefix). ErrFSMSnapshotTokenInvalid = errors.New("fsm snapshot: token format invalid") // ErrFSMSnapshotTooLarge is returned when the payload exceeds fsmMaxInMemPayload. // Callers should switch to the streaming path (openFSMSnapshotPayloadReader). ErrFSMSnapshotTooLarge = errors.New("fsm snapshot: payload exceeds maximum in-memory size limit") )
var ErrEncryptionApply = encryption.ErrEncryptionApply
ErrEncryptionApply is re-exported from internal/encryption so callers in this package can errors.Is-match the FSM-side haltApplyResponse without taking a kv-package import (which would create a cycle: kv → internal/raftengine/etcd → kv tests). The canonical definition lives in internal/encryption/errors.go; see that file's doc for the §6.3 fail-closed contract.
var ErrEnvelopeCutoverInProgress = errors.New("raftengine/etcd: raft envelope cutover barrier open; retry shortly")
ErrEnvelopeCutoverInProgress is returned by Engine.Propose (and the coordinator wrap-on-propose path) while the §7.1 proposal-quiescence barrier owned by EnableRaftEnvelope is open: new user proposals are rejected with this error so the barrier can drain the in-flight set and atomically flip wrap-on-propose without the leader admitting a plaintext proposal at `index > raftEnvelopeCutoverIndex`.
The barrier admits exactly two source classes:
- The cutover entry itself, proposed by EnableRaftEnvelope (source = "encryption_admin") — without this exemption the barrier would deadlock on its own cutover proposal.
- ConfChange-time RegisterEncryptionWriter proposals (Stage 7c §3.1) which also pass source = "encryption_admin" because a new member joining mid-barrier must still be able to register its writer-registry entry.
All other Propose calls receive ErrEnvelopeCutoverInProgress during the barrier window. Caller policy: surface this as a retryable error to the client — the barrier completes in O(in-flight-drain-time) and the next attempt will succeed under the new (wrapped) regime.
var ErrRaftUnwrapFailed = errors.New("raftengine/etcd: raft envelope unwrap failed; halting apply")
ErrRaftUnwrapFailed is returned by applyNormalEntry when the pre-apply hook cannot unwrap a §4.2 raft envelope (GCM tag mismatch, missing DEK in the local keystore, malformed envelope, or active tampering).
Per design §6.3 this is a process-fatal event, NOT a recoverable Apply error. applyCommitted propagates the error up to runLoop, which exits via the engine's existing fatal-error path. The failing entry's index is NOT advanced through setApplied — the next restart must replay the same entry, not skip it. Silently skipping would let the local FSM diverge from peers that DID successfully unwrap and apply, breaking the consistency invariant the integrity tag was added to detect.
Operator response: investigate sidecar / Raft-log divergence (§5.5 of the encryption design doc) or KEK custody (§9.3); a supervised restart with a corrected sidecar is the only safe recovery path.
Functions ¶
func DeriveNodeID ¶
func JoinRoleViolationCount ¶
func JoinRoleViolationCount() uint64
JoinRoleViolationCount returns the cumulative count of --raftJoinAsLearner alarms that have fired since process start. See docs/design/2026_04_26_proposed_raft_learner.md §4.5.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func Open ¶
func Open(ctx context.Context, cfg OpenConfig) (*Engine, error)
Open starts the etcd/raft backend.
Single-node bootstrap waits for local leadership so callers can use the engine immediately. Multi-node startup returns after the local node is running; leadership is established asynchronously through raft transport.
func (*Engine) AddLearner ¶
func (e *Engine) AddLearner(ctx context.Context, id string, address string, prevIndex uint64) (uint64, error)
AddLearner attaches a non-voting replica. The learner is added via V1 ConfChangeAddLearnerNode and starts receiving log entries immediately, but does not contribute to the voter quorum until promoted with PromoteLearner.
func (*Engine) AppliedIndex ¶
AppliedIndex returns the highest log index applied to the local FSM. Suitable for callers that need a non-blocking read fence equivalent to what LinearizableRead would have returned, paired with an external quorum confirmation (e.g. a valid lease).
Lock-free: reads the mirrored atomic.Uint64 written by the run loop's apply path (and by Restore's snapshot installation), so the lease-read fast path does not contend with refreshStatus's write lock under high read concurrency.
func (*Engine) Configuration ¶
func (e *Engine) Configuration(ctx context.Context) (raftengine.Configuration, error)
func (*Engine) DispatchDropCount ¶
DispatchDropCount returns the total number of outbound raft messages dropped before hitting the transport because the per-peer normal or heartbeat channel was full. Monotonic across the life of the engine. Surfaced to Prometheus via the monitoring package so the hot-path dashboard can graph stepCh saturation alongside LinearizableRead rate (see the "Hot Path" row in monitoring/grafana/dashboards/elastickv-redis-summary.json).
func (*Engine) DispatchErrorCount ¶
DispatchErrorCount returns the total number of outbound raft dispatches that reached the transport but failed (network errors, remote shutdown, etc.). Monotonic across the life of the engine.
func (*Engine) DispatchErrorCountsByCode ¶
DispatchErrorCountsByCode returns a snapshot of dispatch-error counts keyed by grpc status code ("Unavailable", "DeadlineExceeded", "ResourceExhausted", ...). Sum of values equals DispatchErrorCount(). A separate breakdown is needed to tell whether failures are peer-down (Unavailable), leader under load (DeadlineExceeded), or flow-control (ResourceExhausted). Returns an empty map when e is nil. Safe for concurrent callers; the returned map is a copy.
func (*Engine) EncryptionScanner ¶
func (e *Engine) EncryptionScanner() encryption.EncryptionRelevantScanner
EncryptionScanner returns the engine's encryption.EncryptionRelevantScanner implementation. The returned value is safe to call after Open() has populated MemoryStorage from the on-disk WAL + snapshot, and only after — calling it before Open() returns a nil-storage error from HasEncryptionRelevantEntryInRange.
Used by main.go's startup-guard phase (Stage 6C-2d) to invoke encryption.GuardSidecarBehindRaftLog against the engine's applied index and the sidecar's raft_applied_index.
Nil-receiver-safe: a nil *Engine returns a zero-value encryptionScanner whose HasEncryptionRelevantEntryInRange will surface the nil-storage error. This matches the rest of the package's defensive nil-receiver posture (Status() / AppliedIndex() return zero values on nil receivers) and lets call sites in the lifecycle that may forward a maybe-nil engine pointer avoid a panic during startup/refusal triage.
func (*Engine) LastQuorumAck ¶
LastQuorumAck returns the monotonic-raw instant by which a majority of followers most recently responded to the leader, or the zero Instant when no such observation exists (follower / candidate / startup).
Lock-free: reads atomic.Int64 values published by recordQuorumAck (multi-node cluster) or refreshStatus (single-node cluster keeps singleNodeLeaderAckMonoNs alive with monoclock.Now() while leader, so the hot lease-read path performs zero lock work). The monotonic- raw source keeps the lease safe against NTP-slew / wall-clock-step events; see raftengine.LeaseProvider for the correctness contract.
func (*Engine) Leader ¶
func (e *Engine) Leader() raftengine.LeaderInfo
func (*Engine) LeaseDuration ¶
LeaseDuration returns the time during which a lease holder can serve reads from local state without re-confirming leadership via ReadIndex. It is bounded by electionTimeout - leaseSafetyMargin so that the lease expires before a successor leader could realistically be elected and accept new writes elsewhere.
func (*Engine) LinearizableRead ¶
func (*Engine) PromoteLearner ¶
func (e *Engine) PromoteLearner(ctx context.Context, id string, prevIndex uint64, minAppliedIndex uint64, skipMinAppliedCheck bool) (uint64, error)
PromoteLearner promotes an existing learner to voter via V1 ConfChangeAddNode. The minAppliedIndex precondition rejects the call if the learner's leader-side Progress.Match has not reached that index yet, so an operator who promotes too eagerly gets a clean FailedPrecondition error instead of a silent quorum stall.
minAppliedIndex == 0 is rejected unless skipMinAppliedCheck is also true. Set skipMinAppliedCheck only when the catch-up has been confirmed out-of-band.
func (*Engine) Propose ¶
func (e *Engine) Propose(ctx context.Context, data []byte) (*raftengine.ProposalResult, error)
func (*Engine) ProposeAdmin ¶
func (e *Engine) ProposeAdmin(ctx context.Context, data []byte) (*raftengine.ProposalResult, error)
ProposeAdmin drives a control-plane proposal that must remain admissible across the §7.1 quiescence barrier — currently the EnableRaftEnvelope cutover entry and ConfChange-time RegisterEncryptionWriter proposals (see raftengine.Proposer's contract for the full exempt set).
The barrier exemption is the SOLE divergence from Propose: a higher-layer wrap (kv.wrappedProposer) applies its wrap closure to both methods identically, so admin entries landing above the raft-envelope cutover still carry the AEAD envelope the §6.3 strict-`>` apply hook expects. Only the cutover marker itself is cleartext, and it bypasses the wrap layer at the call site (raw engine reference), not at the method level.
In the current build ProposeAdmin is operationally identical to Propose; Stage 6E-2d adds the barrier check on Propose only. The two methods are kept distinct from the outset so the migration of call sites (this PR) lands ahead of the behaviour change (6E-2d) — calling Propose from an exempt site today is silently wrong tomorrow.
func (*Engine) RegisterLeaderAcquiredCallback ¶
func (e *Engine) RegisterLeaderAcquiredCallback(fn func()) (deregister func())
RegisterLeaderAcquiredCallback registers fn to fire every time the local node's Raft state transitions INTO leader (initial election win, re-election after partition heal, leadership transfer target completion). Callbacks fire on the previous!=Leader → status==Leader edge in refreshStatus, after e.isLeader has been published, so a callback that reads engine.State() observes StateLeader.
Use case: per-shard policy that needs to audit a freshly-acquired leadership ("am I still allowed to be leader of this group?"). SQS HT-FIFO leadership-refusal (§8 of the split-queue FIFO design) hangs off this hook to TransferLeadership when the binary lacks the htfifo capability but a partitioned queue is mapped to this Raft group.
Callbacks run synchronously from refreshStatus and MUST be non-blocking — same contract as RegisterLeaderLossCallback. A callback wanting to do real work (e.g. enumerate the catalog, call TransferLeadership) MUST offload to a goroutine.
A panic inside a callback is contained and logged so a bug in one holder cannot crash the engine or break other callbacks.
The returned deregister function removes this specific registration and is safe to call multiple times.
func (*Engine) RegisterLeaderLossCallback ¶
func (e *Engine) RegisterLeaderLossCallback(fn func()) (deregister func())
RegisterLeaderLossCallback registers fn to fire every time the local node's Raft state transitions out of leader (CheckQuorum step-down, graceful transfer completion, partition-induced demotion) and also on shutdown() while the node was still leader. Callbacks are NOT fired at the moment a transfer starts (LeadTransferee != 0); they only fire once the transfer completes and state flips to follower. Lease-read callers use this to invalidate cached lease state so the next read takes the slow path.
Callbacks run synchronously from refreshStatus / shutdown / fail and MUST be non-blocking (each should be a fast, lock-free invalidation). A panic inside a callback is contained and logged so a bug in one holder cannot crash the engine or break other callbacks. LeaseRead also guards its fast path on engine.State() == StateLeader so the small window between the transition and this callback completing cannot serve stale reads.
The returned deregister function removes this specific registration and is safe to call multiple times. Long-lived callers (coordinators whose lifetime matches the engine's) may ignore it; shorter-lived callers MUST invoke it to avoid accumulating dead callbacks in the engine's slice.
func (*Engine) RemoveServer ¶
func (*Engine) State ¶
func (e *Engine) State() raftengine.State
func (*Engine) Status ¶
func (e *Engine) Status() raftengine.Status
func (*Engine) StepQueueFullCount ¶
StepQueueFullCount returns the total number of inbound raft messages that could not be enqueued into stepCh because the channel was at capacity. This is the "etcd raft inbound step queue is full" signal from the task description: a spike indicates the local raft loop is starved, usually by something blocking the apply path such as the pre-#560 rawKeyTypeAt seek storm.
func (*Engine) TransferLeadershipToServer ¶
type Factory ¶
type Factory struct {
// contains filtered or unexported fields
}
Factory creates etcd raft engine instances.
func NewFactory ¶
func NewFactory(cfg FactoryConfig) *Factory
NewFactory returns a Factory with the given etcd-specific settings.
func (*Factory) Create ¶
func (f *Factory) Create(cfg raftengine.FactoryConfig) (*raftengine.FactoryResult, error)
func (*Factory) EngineType ¶
type FactoryConfig ¶
type FactoryConfig struct {
TickInterval time.Duration
HeartbeatTick int
ElectionTick int
MaxSizePerMsg uint64
MaxInflightMsg int
}
FactoryConfig holds etcd-specific engine parameters.
type GRPCTransport ¶
type GRPCTransport struct {
pb.UnimplementedEtcdRaftServer
// contains filtered or unexported fields
}
func NewGRPCTransport ¶
func NewGRPCTransport(peers []Peer) *GRPCTransport
func (*GRPCTransport) Close ¶
func (t *GRPCTransport) Close() error
func (*GRPCTransport) DispatchSnapshotSpool ¶
func (*GRPCTransport) Register ¶
func (t *GRPCTransport) Register(server grpc.ServiceRegistrar)
func (*GRPCTransport) RemovePeer ¶
func (t *GRPCTransport) RemovePeer(nodeID uint64)
func (*GRPCTransport) Send ¶
func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error)
func (*GRPCTransport) SendSnapshot ¶
func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) error
func (*GRPCTransport) SetFSMPayloadOpener ¶
func (t *GRPCTransport) SetFSMPayloadOpener(fn func(index uint64) (io.ReadCloser, error))
SetFSMPayloadOpener registers the callback used by the bridge mode to stream FSM snapshot payloads directly from disk without materialising the full payload in memory.
func (*GRPCTransport) SetFSMPayloadReader ¶
func (t *GRPCTransport) SetFSMPayloadReader(fn func(index uint64) ([]byte, error))
func (*GRPCTransport) SetFSMSnapDir ¶
func (t *GRPCTransport) SetFSMSnapDir(dir string)
func (*GRPCTransport) SetHandler ¶
func (t *GRPCTransport) SetHandler(handler MessageHandler)
func (*GRPCTransport) SetSpoolDir ¶
func (t *GRPCTransport) SetSpoolDir(dir string)
func (*GRPCTransport) UpsertPeer ¶
func (t *GRPCTransport) UpsertPeer(peer Peer)
type MigrationStats ¶
func MigrateFSMStore ¶
func MigrateFSMStore(storePath string, destDataDir string, peers []Peer) (*MigrationStats, error)
type OpenConfig ¶
type OpenConfig struct {
NodeID uint64
LocalID string
LocalAddress string
DataDir string
Peers []Peer
Bootstrap bool
// JoinAsLearner mirrors raftengine.FactoryConfig.JoinAsLearner. The
// engine watches every post-apply ConfState and emits an
// ERROR-level alarm whenever the local node is in Voters while
// JoinAsLearner is true. The check is post-apply only; the node
// continues running.
JoinAsLearner bool
Transport *GRPCTransport
TickInterval time.Duration
ElectionTick int
HeartbeatTick int
StateMachine StateMachine
MaxSizePerMsg uint64
// MaxInflightMsg controls how many MsgApp messages Raft may have in-flight
// per peer before waiting for an acknowledgement (Raft-level flow control).
// It also sets the per-peer dispatch channel capacity, so total buffered
// memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize).
// Default: 256. Increase for deeper pipelining on high-bandwidth links;
// lower in memory-constrained clusters.
MaxInflightMsg int
// RaftCipher carries the AES-GCM Cipher used by the §4.2 raft
// envelope pre-apply hook. nil disables the hook (Stage 3
// default — production wiring lands once Stage 6's cluster
// flag flow is in place).
RaftCipher *encryption.Cipher
// RaftCutoverIndex returns the §7.1 Phase 2 raft envelope
// cutover index. Only entries whose Raft log index is strictly
// greater than this value go through Unwrap. nil ⇒ no cutover
// has been observed yet, equivalent to "raft envelope hook
// off".
RaftCutoverIndex RaftCutoverIndex
}
type Peer ¶
type Peer struct {
NodeID uint64
ID string
Address string
// Suffrage is a v2-peers-file artifact only. It is NOT consulted by
// confStateForPeers (cold bootstrap is voter-only), and it is NOT
// consulted by configurationFromConfState (hot path reads
// ConfState.Voters / ConfState.Learners directly). Its sole role is
// to round-trip the v2 peers file across restarts so the operator
// view of the peers file is consistent with the eventual ConfState
// view after WAL replay. The empty string and SuffrageVoter are
// equivalent. See docs/design/2026_04_26_proposed_raft_learner.md §4.3.
Suffrage string
}
func ParsePeers ¶
type RaftCutoverIndex ¶
type RaftCutoverIndex func() uint64
RaftCutoverIndex returns the §7.1 Phase 2 cutover Raft index. Entries with index strictly greater than the returned value carry raft-envelope-wrapped fsm payloads; entries at or below the cutover are cleartext. The returned value is read on every applyNormalEntry, so implementations should be lock-free (atomic.Uint64.Load) — the engine does not synchronize the read.
The Stage 3 default (when OpenConfig.RaftCutoverIndex is nil) is `^uint64(0)` (no entry's index is greater) so the unwrap path is inert until Stage 6 wires the sidecar's raft_envelope_cutover_index in.
type Snapshot ¶
type Snapshot = raftengine.Snapshot
Snapshot is an alias for the shared raftengine.Snapshot interface.
type StateMachine ¶
type StateMachine = raftengine.StateMachine
StateMachine is an alias for the shared raftengine.StateMachine interface.