Documentation
¶
Index ¶
- Constants
- Variables
- func Is(err, target error) bool
- func IsFatal(err error) bool
- func IsNoLeaderError(err error) bool
- func IsNoQuorumError(err error) bool
- func IsNotLeaderError(err error) bool
- func IsRetryable(err error) bool
- func IsStaleTermError(err error) bool
- func NewNoLeaderError() *errors.ForgeError
- func NewNoQuorumError(required, available int) *errors.ForgeError
- func NewNotLeaderError(nodeID string, leaderID string) *errors.ForgeError
- func NewStaleTermError(current, received uint64) *errors.ForgeError
- func NewTimeoutError(operation string) *errors.ForgeError
- type AdminAPIConfig
- type AdvancedConfig
- type AppendEntriesRequest
- type AppendEntriesResponse
- type BadgerOptions
- type BatchOp
- type BatchOpType
- type BoltOptions
- type ClusterInfo
- type ClusterManager
- type Command
- type Config
- type ConfigOption
- func WithBindAddress(addr string, port int) ConfigOption
- func WithClusterID(id string) ConfigOption
- func WithConfig(cfg Config) ConfigOption
- func WithDiscoveryType(discoveryType string) ConfigOption
- func WithMTLS(caFile string) ConfigOption
- func WithNodeID(id string) ConfigOption
- func WithPeers(peers []PeerConfig) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithStoragePath(path string) ConfigOption
- func WithStorageType(storageType string) ConfigOption
- func WithTLS(certFile, keyFile string) ConfigOption
- func WithTransportType(transportType string) ConfigOption
- type ConsensusEvent
- type ConsensusEventType
- type ConsensusService
- type ConsensusStats
- type Discovery
- type DiscoveryConfig
- type DiscoveryEvent
- type DiscoveryEventType
- type ElectionConfig
- type EntryType
- type EventsConfig
- type HealthCheck
- type HealthConfig
- type HealthStatus
- type HealthStatusEvent
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type KeyValue
- type LeaderElectedEvent
- type LogEntry
- type LoggingConfig
- type MembershipChangedEvent
- type Message
- type MessageType
- type MetricsConfig
- type NodeChangeEvent
- type NodeChangeType
- type NodeInfo
- type NodeRole
- type NodeStatus
- type ObservabilityConfig
- type PeerConfig
- type QuorumStatusEvent
- type RaftConfig
- type RaftNode
- type RaftStats
- type RequestVoteRequest
- type RequestVoteResponse
- type ResilienceConfig
- type RoleChangedEvent
- type SecurityConfig
- type Snapshot
- type SnapshotEvent
- type SnapshotMetadata
- type StateMachine
- type Storage
- type StorageConfig
- type TracingConfig
- type Transport
- type TransportConfig
Constants ¶
const ( ErrCodeNotLeader = "CONSENSUS_NOT_LEADER" ErrCodeNoLeader = "CONSENSUS_NO_LEADER" ErrCodeNotStarted = "CONSENSUS_NOT_STARTED" ErrCodeAlreadyStarted = "CONSENSUS_ALREADY_STARTED" ErrCodeNodeNotFound = "CONSENSUS_NODE_NOT_FOUND" ErrCodeClusterNotFound = "CONSENSUS_CLUSTER_NOT_FOUND" ErrCodeNoQuorum = "CONSENSUS_NO_QUORUM" ErrCodeInvalidTerm = "CONSENSUS_INVALID_TERM" ErrCodeStaleTerm = "CONSENSUS_STALE_TERM" ErrCodeLogInconsistent = "CONSENSUS_LOG_INCONSISTENT" ErrCodeSnapshotFailed = "CONSENSUS_SNAPSHOT_FAILED" ErrCodeCompactionFailed = "CONSENSUS_COMPACTION_FAILED" ErrCodeElectionTimeout = "CONSENSUS_ELECTION_TIMEOUT" ErrCodeInvalidPeer = "CONSENSUS_INVALID_PEER" ErrCodePeerExists = "CONSENSUS_PEER_EXISTS" ErrCodePeerNotFound = "CONSENSUS_PEER_NOT_FOUND" ErrCodeInsufficientPeers = "CONSENSUS_INSUFFICIENT_PEERS" ErrCodeNoAvailablePeers = "CONSENSUS_NO_AVAILABLE_PEERS" ErrCodeFailoverFailed = "CONSENSUS_FAILOVER_FAILED" ErrCodeOperationFailed = "CONSENSUS_OPERATION_FAILED" ErrCodeReplicationFailed = "CONSENSUS_REPLICATION_FAILED" ErrCodeTimeout = "CONSENSUS_TIMEOUT" ErrCodeConnectionFailed = "CONSENSUS_CONNECTION_FAILED" ErrCodeStaleRead = "CONSENSUS_STALE_READ" ErrCodeRateLimitExceeded = "CONSENSUS_RATE_LIMIT_EXCEEDED" ErrCodeAuthenticationFailed = "CONSENSUS_AUTHENTICATION_FAILED" ErrCodePoolExhausted = "CONSENSUS_POOL_EXHAUSTED" ErrCodeConnectionTimeout = "CONSENSUS_CONNECTION_TIMEOUT" )
Consensus error codes.
Variables ¶
var ( // ErrNotLeader indicates the node is not the leader. ErrNotLeader = &errors.ForgeError{Code: ErrCodeNotLeader, Message: "node is not the leader"} // ErrNoLeader indicates there is no leader. ErrNoLeader = &errors.ForgeError{Code: ErrCodeNoLeader, Message: "no leader available"} // ErrNotStarted indicates the service is not started. ErrNotStarted = &errors.ForgeError{Code: ErrCodeNotStarted, Message: "consensus service not started"} // ErrAlreadyStarted indicates the service is already started. ErrAlreadyStarted = &errors.ForgeError{Code: ErrCodeAlreadyStarted, Message: "consensus service already started"} // ErrNodeNotFound indicates a node was not found. ErrNodeNotFound = &errors.ForgeError{Code: ErrCodeNodeNotFound, Message: "node not found"} // ErrClusterNotFound indicates a cluster was not found. ErrClusterNotFound = &errors.ForgeError{Code: ErrCodeClusterNotFound, Message: "cluster not found"} ErrStorageUnavailable = &errors.ForgeError{Code: ErrCodeStorageUnavailable, Message: "storage unavailable"} ErrTransportUnavailable = &errors.ForgeError{Code: ErrCodeTransportUnavailable, Message: "transport unavailable"} ErrDiscoveryUnavailable = &errors.ForgeError{Code: ErrCodeDiscoveryUnavailable, Message: "discovery service unavailable"} // ErrNoQuorum indicates no quorum is available. ErrNoQuorum = &errors.ForgeError{Code: ErrCodeNoQuorum, Message: "no quorum available"} // ErrInvalidTerm indicates an invalid term. ErrInvalidTerm = &errors.ForgeError{Code: ErrCodeInvalidTerm, Message: "invalid term"} // ErrStaleTerm indicates a stale term. ErrStaleTerm = &errors.ForgeError{Code: ErrCodeStaleTerm, Message: "stale term"} // ErrLogInconsistent indicates log inconsistency. ErrLogInconsistent = &errors.ForgeError{Code: ErrCodeLogInconsistent, Message: "log inconsistent"} // ErrSnapshotFailed indicates snapshot operation failed. ErrSnapshotFailed = &errors.ForgeError{Code: ErrCodeSnapshotFailed, Message: "snapshot operation failed"} // ErrCompactionFailed indicates compaction failed. ErrCompactionFailed = &errors.ForgeError{Code: ErrCodeCompactionFailed, Message: "compaction failed"} // ErrElectionTimeout indicates election timeout. ErrElectionTimeout = &errors.ForgeError{Code: ErrCodeElectionTimeout, Message: "election timeout"} // ErrInvalidPeer indicates an invalid peer. ErrInvalidPeer = &errors.ForgeError{Code: ErrCodeInvalidPeer, Message: "invalid peer"} // ErrPeerExists indicates peer already exists. ErrPeerExists = &errors.ForgeError{Code: ErrCodePeerExists, Message: "peer already exists"} // ErrPeerNotFound indicates peer not found. ErrPeerNotFound = &errors.ForgeError{Code: ErrCodePeerNotFound, Message: "peer not found"} // ErrInsufficientPeers indicates insufficient peers. ErrInsufficientPeers = &errors.ForgeError{Code: ErrCodeInsufficientPeers, Message: "insufficient peers"} // ErrNoAvailablePeers indicates no available peers for operation. ErrNoAvailablePeers = &errors.ForgeError{Code: ErrCodeNoAvailablePeers, Message: "no available peers"} // ErrFailoverFailed indicates failover operation failed. ErrFailoverFailed = &errors.ForgeError{Code: ErrCodeFailoverFailed, Message: "failover operation failed"} // ErrOperationFailed indicates a generic operation failure. ErrOperationFailed = &errors.ForgeError{Code: ErrCodeOperationFailed, Message: "operation failed"} // ErrReplicationFailed indicates replication operation failed. ErrReplicationFailed = &errors.ForgeError{Code: ErrCodeReplicationFailed, Message: "replication failed"} // ErrTimeout indicates operation timeout. ErrTimeout = &errors.ForgeError{Code: ErrCodeTimeout, Message: "operation timeout"} // ErrConnectionFailed indicates connection failure. ErrConnectionFailed = &errors.ForgeError{Code: ErrCodeConnectionFailed, Message: "connection failed"} // ErrStaleRead indicates a stale read attempt. ErrStaleRead = &errors.ForgeError{Code: ErrCodeStaleRead, Message: "stale read"} // ErrRateLimitExceeded indicates rate limit exceeded. ErrRateLimitExceeded = &errors.ForgeError{Code: ErrCodeRateLimitExceeded, Message: "rate limit exceeded"} // ErrAuthenticationFailed indicates authentication failure. ErrAuthenticationFailed = &errors.ForgeError{Code: ErrCodeAuthenticationFailed, Message: "authentication failed"} // ErrPoolExhausted indicates connection pool exhausted. ErrPoolExhausted = &errors.ForgeError{Code: ErrCodePoolExhausted, Message: "connection pool exhausted"} // ErrConnectionTimeout indicates connection timeout. ErrConnectionTimeout = &errors.ForgeError{Code: ErrCodeConnectionTimeout, Message: "connection timeout"} // ErrInvalidConfig indicates invalid configuration. ErrInvalidConfig = errors.ErrInvalidConfigSentinel )
Sentinel errors for use with errors.Is.
Functions ¶
func IsNoLeaderError ¶
IsNoLeaderError returns true if the error is a no leader error.
func IsNoQuorumError ¶
IsNoQuorumError returns true if the error is a no quorum error.
func IsNotLeaderError ¶
IsNotLeaderError returns true if the error is a not leader error.
func IsRetryable ¶
IsRetryable returns true if the error is retryable.
func IsStaleTermError ¶
IsStaleTermError returns true if the error is a stale term error.
func NewNoLeaderError ¶
func NewNoLeaderError() *errors.ForgeError
NewNoLeaderError creates a no leader error.
func NewNoQuorumError ¶
func NewNoQuorumError(required, available int) *errors.ForgeError
NewNoQuorumError creates a no quorum error with context.
func NewNotLeaderError ¶
func NewNotLeaderError(nodeID string, leaderID string) *errors.ForgeError
NewNotLeaderError creates a not leader error with context.
func NewStaleTermError ¶
func NewStaleTermError(current, received uint64) *errors.ForgeError
NewStaleTermError creates a stale term error with context.
func NewTimeoutError ¶
func NewTimeoutError(operation string) *errors.ForgeError
NewTimeoutError creates a timeout error.
Types ¶
type AdminAPIConfig ¶
type AdminAPIConfig struct {
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
PathPrefix string `default:"/consensus" json:"path_prefix" yaml:"path_prefix"`
EnableAuth bool `default:"false" json:"enable_auth" yaml:"enable_auth"`
APIKey string `json:"api_key" yaml:"api_key"`
}
AdminAPIConfig contains admin API configuration.
type AdvancedConfig ¶
type AdvancedConfig struct {
EnableAutoSnapshot bool `default:"true" json:"enable_auto_snapshot" yaml:"enable_auto_snapshot"`
EnableAutoCompaction bool `default:"true" json:"enable_auto_compaction" yaml:"enable_auto_compaction"`
CompactionInterval time.Duration `default:"1h" json:"compaction_interval" yaml:"compaction_interval"`
MaxMemoryUsage int64 `default:"1073741824" json:"max_memory_usage" yaml:"max_memory_usage"` // 1GB
GCInterval time.Duration `default:"5m" json:"gc_interval" yaml:"gc_interval"`
EnableReadIndex bool `default:"true" json:"enable_read_index" yaml:"enable_read_index"`
EnableLeasedReads bool `default:"true" json:"enable_leased_reads" yaml:"enable_leased_reads"`
}
AdvancedConfig contains advanced settings.
type AppendEntriesRequest ¶
type AppendEntriesRequest struct {
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
PrevLogIndex uint64 `json:"prev_log_index"`
PrevLogTerm uint64 `json:"prev_log_term"`
Entries []LogEntry `json:"entries"`
LeaderCommit uint64 `json:"leader_commit"`
}
AppendEntriesRequest represents an AppendEntries RPC request.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
Term uint64 `json:"term"`
Success bool `json:"success"`
MatchIndex uint64 `json:"match_index"`
NodeID string `json:"node_id"`
}
AppendEntriesResponse represents an AppendEntries RPC response.
type BadgerOptions ¶
type BadgerOptions struct {
ValueLogMaxEntries uint32 `default:"1000000" json:"value_log_max_entries" yaml:"value_log_max_entries"`
MemTableSize int64 `default:"67108864" json:"mem_table_size" yaml:"mem_table_size"` // 64MB
NumMemTables int `default:"5" json:"num_mem_tables" yaml:"num_mem_tables"`
NumLevelZeroTables int `default:"5" json:"num_level_zero_tables" yaml:"num_level_zero_tables"`
NumCompactors int `default:"4" json:"num_compactors" yaml:"num_compactors"`
}
BadgerOptions contains BadgerDB-specific options.
type BatchOp ¶
type BatchOp struct {
Type BatchOpType
Key []byte
Value []byte
}
BatchOp represents a batch operation.
type BatchOpType ¶
type BatchOpType int
BatchOpType represents the type of batch operation.
const ( // BatchOpSet is a set operation. BatchOpSet BatchOpType = iota // BatchOpDelete is a delete operation. BatchOpDelete )
type BoltOptions ¶
type BoltOptions struct {
NoSync bool `default:"false" json:"no_sync" yaml:"no_sync"`
NoGrowSync bool `default:"false" json:"no_grow_sync" yaml:"no_grow_sync"`
InitialMmapSize int `default:"0" json:"initial_mmap_size" yaml:"initial_mmap_size"`
Timeout time.Duration `default:"1s" json:"timeout" yaml:"timeout"`
}
BoltOptions contains BoltDB-specific options.
type ClusterInfo ¶
type ClusterInfo struct {
ID string `json:"id"`
Leader string `json:"leader"`
Term uint64 `json:"term"`
Nodes []NodeInfo `json:"nodes"`
TotalNodes int `json:"total_nodes"`
ActiveNodes int `json:"active_nodes"`
HasQuorum bool `json:"has_quorum"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
}
ClusterInfo represents information about the cluster.
type ClusterManager ¶
type ClusterManager interface {
// GetNodes returns all nodes in the cluster
GetNodes() []NodeInfo
// GetNode returns information about a specific node
GetNode(nodeID string) (*NodeInfo, error)
// AddNode adds a node to the cluster
AddNode(nodeID, address string, port int) error
// RemoveNode removes a node from the cluster
RemoveNode(nodeID string) error
// UpdateNode updates node information
UpdateNode(nodeID string, info NodeInfo) error
// GetLeader returns the current leader node
GetLeader() *NodeInfo
// HasQuorum returns true if the cluster has quorum
HasQuorum() bool
// GetClusterSize returns the size of the cluster
GetClusterSize() int
// GetHealthyNodes returns the number of healthy nodes
GetHealthyNodes() int
}
ClusterManager defines the interface for cluster management.
type Config ¶
type Config struct {
// Node configuration
NodeID string `json:"node_id" yaml:"node_id"`
ClusterID string `json:"cluster_id" yaml:"cluster_id"`
BindAddr string `default:"0.0.0.0" json:"bind_addr" yaml:"bind_addr"`
BindPort int `default:"7000" json:"bind_port" yaml:"bind_port"`
// Peers - initial cluster members
Peers []PeerConfig `json:"peers" yaml:"peers"`
// Raft configuration
Raft RaftConfig `json:"raft" yaml:"raft"`
// Transport configuration
Transport TransportConfig `json:"transport" yaml:"transport"`
// Discovery configuration
Discovery DiscoveryConfig `json:"discovery" yaml:"discovery"`
// Storage configuration
Storage StorageConfig `json:"storage" yaml:"storage"`
// Election configuration
Election ElectionConfig `json:"election" yaml:"election"`
// Health check configuration
Health HealthConfig `json:"health" yaml:"health"`
// Observability configuration
Observability ObservabilityConfig `json:"observability" yaml:"observability"`
// Security configuration
Security SecurityConfig `json:"security" yaml:"security"`
// Resilience configuration
Resilience ResilienceConfig `json:"resilience" yaml:"resilience"`
// Admin API configuration
AdminAPI AdminAPIConfig `json:"admin_api" yaml:"admin_api"`
// Events configuration
Events EventsConfig `json:"events" yaml:"events"`
// Advanced settings
Advanced AdvancedConfig `json:"advanced" yaml:"advanced"`
// Internal flag
RequireConfig bool `json:"-" yaml:"-"`
}
Config contains all consensus configuration.
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config.
func WithBindAddress ¶
func WithBindAddress(addr string, port int) ConfigOption
WithBindAddress sets the bind address and port.
func WithConfig ¶
func WithConfig(cfg Config) ConfigOption
WithConfig sets the entire config (for backward compatibility).
func WithDiscoveryType ¶
func WithDiscoveryType(discoveryType string) ConfigOption
WithDiscoveryType sets the discovery type.
func WithMTLS ¶
func WithMTLS(caFile string) ConfigOption
WithMTLS enables mutual TLS with the given CA file.
func WithPeers ¶
func WithPeers(peers []PeerConfig) ConfigOption
WithPeers sets the initial peer list.
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
WithRequireConfig sets whether config is required from ConfigManager.
func WithStoragePath ¶
func WithStoragePath(path string) ConfigOption
WithStoragePath sets the storage path.
func WithStorageType ¶
func WithStorageType(storageType string) ConfigOption
WithStorageType sets the storage type.
func WithTLS ¶
func WithTLS(certFile, keyFile string) ConfigOption
WithTLS enables TLS with the given cert and key files.
func WithTransportType ¶
func WithTransportType(transportType string) ConfigOption
WithTransportType sets the transport type.
type ConsensusEvent ¶
type ConsensusEvent struct {
Type ConsensusEventType `json:"type"`
NodeID string `json:"node_id"`
ClusterID string `json:"cluster_id"`
Data map[string]any `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
ConsensusEvent represents a consensus event.
type ConsensusEventType ¶
type ConsensusEventType string
ConsensusEventType represents the type of consensus event.
const ( // Node lifecycle events. ConsensusEventNodeStarted ConsensusEventType = "consensus.node.started" ConsensusEventNodeStopped ConsensusEventType = "consensus.node.stopped" ConsensusEventNodeJoined ConsensusEventType = "consensus.node.joined" ConsensusEventNodeLeft ConsensusEventType = "consensus.node.left" ConsensusEventNodeFailed ConsensusEventType = "consensus.node.failed" ConsensusEventNodeRecovered ConsensusEventType = "consensus.node.recovered" // Leadership events. ConsensusEventLeaderElected ConsensusEventType = "consensus.leader.elected" ConsensusEventLeaderStepDown ConsensusEventType = "consensus.leader.stepdown" ConsensusEventLeaderTransfer ConsensusEventType = "consensus.leader.transfer" ConsensusEventLeaderLost ConsensusEventType = "consensus.leader.lost" // Role change events. ConsensusEventRoleChanged ConsensusEventType = "consensus.role.changed" ConsensusEventBecameFollower ConsensusEventType = "consensus.role.follower" ConsensusEventBecameCandidate ConsensusEventType = "consensus.role.candidate" ConsensusEventBecameLeader ConsensusEventType = "consensus.role.leader" // Cluster events. ConsensusEventClusterFormed ConsensusEventType = "consensus.cluster.formed" ConsensusEventClusterUpdated ConsensusEventType = "consensus.cluster.updated" ConsensusEventQuorumAchieved ConsensusEventType = "consensus.cluster.quorum.achieved" ConsensusEventQuorumLost ConsensusEventType = "consensus.cluster.quorum.lost" ConsensusEventMembershipChanged ConsensusEventType = "consensus.cluster.membership.changed" // Log events. ConsensusEventLogAppended ConsensusEventType = "consensus.log.appended" ConsensusEventLogCommitted ConsensusEventType = "consensus.log.committed" ConsensusEventLogCompacted ConsensusEventType = "consensus.log.compacted" ConsensusEventLogTruncated ConsensusEventType = "consensus.log.truncated" // Snapshot events. ConsensusEventSnapshotStarted ConsensusEventType = "consensus.snapshot.started" ConsensusEventSnapshotCompleted ConsensusEventType = "consensus.snapshot.completed" ConsensusEventSnapshotFailed ConsensusEventType = "consensus.snapshot.failed" ConsensusEventSnapshotRestored ConsensusEventType = "consensus.snapshot.restored" // Health events. ConsensusEventHealthy ConsensusEventType = "consensus.health.healthy" ConsensusEventUnhealthy ConsensusEventType = "consensus.health.unhealthy" ConsensusEventDegraded ConsensusEventType = "consensus.health.degraded" ConsensusEventRecovering ConsensusEventType = "consensus.health.recovering" // Configuration events. ConsensusEventConfigUpdated ConsensusEventType = "consensus.config.updated" ConsensusEventConfigReloaded ConsensusEventType = "consensus.config.reloaded" )
type ConsensusService ¶
type ConsensusService interface {
// Start starts the consensus service
Start(ctx context.Context) error
// Stop stops the consensus service
Stop(ctx context.Context) error
// IsLeader returns true if this node is the leader
IsLeader() bool
// GetLeader returns the current leader node ID
GetLeader() string
// GetRole returns the current role of this node
GetRole() NodeRole
// GetTerm returns the current term
GetTerm() uint64
// Apply applies a command to the state machine
Apply(ctx context.Context, cmd Command) error
// Read performs a consistent read operation
Read(ctx context.Context, query any) (any, error)
// GetStats returns consensus statistics
GetStats() ConsensusStats
// HealthCheck performs a health check
HealthCheck(ctx context.Context) error
// GetHealthStatus returns detailed health status
GetHealthStatus(ctx context.Context) HealthStatus
// GetClusterInfo returns cluster information
GetClusterInfo() ClusterInfo
// AddNode adds a node to the cluster
AddNode(ctx context.Context, nodeID, address string, port int) error
// RemoveNode removes a node from the cluster
RemoveNode(ctx context.Context, nodeID string) error
// TransferLeadership transfers leadership to another node
TransferLeadership(ctx context.Context, targetNodeID string) error
// StepDown causes the leader to step down
StepDown(ctx context.Context) error
// Snapshot creates a snapshot
Snapshot(ctx context.Context) error
// UpdateConfig updates the configuration
UpdateConfig(ctx context.Context, config Config) error
}
ConsensusService defines the interface for the consensus service.
type ConsensusStats ¶
type ConsensusStats struct {
NodeID string `json:"node_id"`
ClusterID string `json:"cluster_id"`
Role NodeRole `json:"role"`
Status NodeStatus `json:"status"`
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
ClusterSize int `json:"cluster_size"`
HealthyNodes int `json:"healthy_nodes"`
HasQuorum bool `json:"has_quorum"`
ElectionsTotal int64 `json:"elections_total"`
ElectionsFailed int64 `json:"elections_failed"`
OperationsTotal int64 `json:"operations_total"`
OperationsFailed int64 `json:"operations_failed"`
OperationsPerSec float64 `json:"operations_per_sec"`
AverageLatencyMs float64 `json:"average_latency_ms"`
ErrorRate float64 `json:"error_rate"`
LogEntries int64 `json:"log_entries"`
SnapshotsTotal int64 `json:"snapshots_total"`
LastSnapshotTime time.Time `json:"last_snapshot_time"`
Uptime time.Duration `json:"uptime"`
StartTime time.Time `json:"start_time"`
}
ConsensusStats represents consensus statistics.
type Discovery ¶
type Discovery interface {
// Start starts the discovery service
Start(ctx context.Context) error
// Stop stops the discovery service
Stop(ctx context.Context) error
// Register registers this node with the discovery service
Register(ctx context.Context, node NodeInfo) error
// Unregister unregisters this node from the discovery service
Unregister(ctx context.Context) error
// GetNodes returns all discovered nodes
GetNodes(ctx context.Context) ([]NodeInfo, error)
// Watch watches for node changes
Watch(ctx context.Context) (<-chan NodeChangeEvent, error)
}
Discovery defines the interface for service discovery.
type DiscoveryConfig ¶
type DiscoveryConfig struct {
Type string `default:"static" json:"type" yaml:"type"` // static, dns, consul, etcd, kubernetes
Endpoints []string `json:"endpoints" yaml:"endpoints"`
Namespace string `default:"default" json:"namespace" yaml:"namespace"`
ServiceName string `default:"forge-consensus" json:"service_name" yaml:"service_name"`
RefreshInterval time.Duration `default:"30s" json:"refresh_interval" yaml:"refresh_interval"`
Timeout time.Duration `default:"10s" json:"timeout" yaml:"timeout"`
TTL time.Duration `default:"60s" json:"ttl" yaml:"ttl"`
EnableWatch bool `default:"true" json:"enable_watch" yaml:"enable_watch"`
}
DiscoveryConfig contains service discovery configuration.
type DiscoveryEvent ¶
type DiscoveryEvent struct {
Type DiscoveryEventType `json:"type"`
NodeID string `json:"node_id"`
Address string `json:"address"`
Port int `json:"port"`
Timestamp time.Time `json:"timestamp"`
}
DiscoveryEvent represents a discovery event.
type DiscoveryEventType ¶
type DiscoveryEventType string
DiscoveryEventType represents the type of discovery event.
const ( // DiscoveryEventTypeJoin indicates a node joined. DiscoveryEventTypeJoin DiscoveryEventType = "join" // DiscoveryEventTypeLeave indicates a node left. DiscoveryEventTypeLeave DiscoveryEventType = "leave" // DiscoveryEventTypeUpdate indicates a node updated. DiscoveryEventTypeUpdate DiscoveryEventType = "update" )
type ElectionConfig ¶
type ElectionConfig struct {
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
RandomizedTimeout bool `default:"true" json:"randomized_timeout" yaml:"randomized_timeout"`
PreVote bool `default:"true" json:"pre_vote" yaml:"pre_vote"`
PriorityElection bool `default:"false" json:"priority_election" yaml:"priority_election"`
Priority int `default:"0" json:"priority" yaml:"priority"`
StepDownOnRemove bool `default:"true" json:"step_down_on_remove" yaml:"step_down_on_remove"`
LeaderStickiness time.Duration `default:"10s" json:"leader_stickiness" yaml:"leader_stickiness"`
}
ElectionConfig contains leader election configuration.
type EventsConfig ¶
type EventsConfig struct {
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
EmitLeaderChange bool `default:"true" json:"emit_leader_change" yaml:"emit_leader_change"`
EmitNodeEvents bool `default:"true" json:"emit_node_events" yaml:"emit_node_events"`
EmitClusterEvents bool `default:"true" json:"emit_cluster_events" yaml:"emit_cluster_events"`
}
EventsConfig contains events configuration.
type HealthCheck ¶
type HealthCheck struct {
Name string `json:"name"`
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
CheckedAt time.Time `json:"checked_at"`
}
HealthCheck represents a single health check result.
type HealthConfig ¶
type HealthConfig struct {
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
CheckInterval time.Duration `default:"10s" json:"check_interval" yaml:"check_interval"`
Timeout time.Duration `default:"5s" json:"timeout" yaml:"timeout"`
UnhealthyThreshold int `default:"3" json:"unhealthy_threshold" yaml:"unhealthy_threshold"`
HealthyThreshold int `default:"2" json:"healthy_threshold" yaml:"healthy_threshold"`
}
HealthConfig contains health check configuration.
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
Status string `json:"status"` // "healthy", "degraded", "unhealthy"
Leader bool `json:"leader"`
HasQuorum bool `json:"has_quorum"`
TotalNodes int `json:"total_nodes"`
ActiveNodes int `json:"active_nodes"`
Details []HealthCheck `json:"details"`
LastCheck time.Time `json:"last_check"`
Checks map[string]any `json:"checks"`
}
HealthStatus represents the health status of the consensus system.
type HealthStatusEvent ¶
type HealthStatusEvent struct {
Status string `json:"status"` // "healthy", "unhealthy", "degraded"
Details string `json:"details,omitempty"`
ChecksFailed []string `json:"checks_failed,omitempty"`
}
HealthStatusEvent contains data for health status events.
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct {
Term uint64 `json:"term"`
LeaderID string `json:"leader_id"`
LastIncludedIndex uint64 `json:"last_included_index"`
LastIncludedTerm uint64 `json:"last_included_term"`
Offset uint64 `json:"offset"`
Data []byte `json:"data"`
Done bool `json:"done"`
}
InstallSnapshotRequest represents an InstallSnapshot RPC request.
type InstallSnapshotResponse ¶
InstallSnapshotResponse represents an InstallSnapshot RPC response.
type LeaderElectedEvent ¶
type LeaderElectedEvent struct {
LeaderID string `json:"leader_id"`
Term uint64 `json:"term"`
VotesReceived int `json:"votes_received"`
TotalVotes int `json:"total_votes"`
ElectionDuration time.Duration `json:"election_duration"`
}
LeaderElectedEvent contains data for leader election events.
type LogEntry ¶
type LogEntry struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Type EntryType `json:"type"`
Data []byte `json:"data"`
Created time.Time `json:"created"`
}
LogEntry represents a log entry in the replicated log.
type LoggingConfig ¶
type LoggingConfig struct {
Level string `default:"info" json:"level" yaml:"level"`
EnableStructured bool `default:"true" json:"enable_structured" yaml:"enable_structured"`
LogRaftDetails bool `default:"false" json:"log_raft_details" yaml:"log_raft_details"`
}
LoggingConfig contains logging configuration.
type MembershipChangedEvent ¶
type MembershipChangedEvent struct {
Action string `json:"action"` // "added", "removed", "updated"
NodeID string `json:"affected_node_id"`
OldMembers []string `json:"old_members"`
NewMembers []string `json:"new_members"`
}
MembershipChangedEvent contains data for membership change events.
type Message ¶
type Message struct {
Type MessageType `json:"type"`
From string `json:"from"`
To string `json:"to"`
Payload any `json:"payload"`
Timestamp int64 `json:"timestamp"`
}
Message represents a message sent between nodes.
type MessageType ¶
type MessageType string
MessageType represents the type of message.
const ( // MessageTypeAppendEntries is for AppendEntries RPC. MessageTypeAppendEntries MessageType = "append_entries" // MessageTypeAppendEntriesResponse is for AppendEntries response. MessageTypeAppendEntriesResponse MessageType = "append_entries_response" // MessageTypeRequestVote is for RequestVote RPC. MessageTypeRequestVote MessageType = "request_vote" // MessageTypeRequestVoteResponse is for RequestVote response. MessageTypeRequestVoteResponse MessageType = "request_vote_response" // MessageTypeInstallSnapshot is for InstallSnapshot RPC. MessageTypeInstallSnapshot MessageType = "install_snapshot" // MessageTypeInstallSnapshotResponse is for InstallSnapshot response. MessageTypeInstallSnapshotResponse MessageType = "install_snapshot_response" // MessageTypeHeartbeat is for heartbeat messages. MessageTypeHeartbeat MessageType = "heartbeat" )
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `default:"true" json:"enabled" yaml:"enabled"`
CollectionInterval time.Duration `default:"15s" json:"collection_interval" yaml:"collection_interval"`
Namespace string `default:"forge_consensus" json:"namespace" yaml:"namespace"`
EnableDetailedMetrics bool `default:"true" json:"enable_detailed_metrics" yaml:"enable_detailed_metrics"`
}
MetricsConfig contains metrics configuration.
type NodeChangeEvent ¶
type NodeChangeEvent struct {
Type NodeChangeType `json:"type"`
Node NodeInfo `json:"node"`
}
NodeChangeEvent represents a node change event.
type NodeChangeType ¶
type NodeChangeType string
NodeChangeType represents the type of node change.
const ( // NodeChangeTypeAdded indicates a node was added. NodeChangeTypeAdded NodeChangeType = "added" // NodeChangeTypeRemoved indicates a node was removed. NodeChangeTypeRemoved NodeChangeType = "removed" // NodeChangeTypeUpdated indicates a node was updated. NodeChangeTypeUpdated NodeChangeType = "updated" )
type NodeInfo ¶
type NodeInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Port int `json:"port"`
Role NodeRole `json:"role"`
Status NodeStatus `json:"status"`
Term uint64 `json:"term"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Metadata map[string]any `json:"metadata"`
}
NodeInfo represents information about a node.
type NodeRole ¶
type NodeRole string
NodeRole represents the role of a node in the consensus system.
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the status of a node.
const ( // StatusActive indicates the node is active and healthy. StatusActive NodeStatus = "active" // StatusInactive indicates the node is inactive. StatusInactive NodeStatus = "inactive" // StatusSuspected indicates the node is suspected of failure. StatusSuspected NodeStatus = "suspected" // StatusFailed indicates the node has failed. StatusFailed NodeStatus = "failed" )
type ObservabilityConfig ¶
type ObservabilityConfig struct {
Metrics MetricsConfig `json:"metrics" yaml:"metrics"`
Tracing TracingConfig `json:"tracing" yaml:"tracing"`
Logging LoggingConfig `json:"logging" yaml:"logging"`
}
ObservabilityConfig contains observability configuration.
type PeerConfig ¶
type PeerConfig struct {
ID string `json:"id" yaml:"id"`
Address string `json:"address" yaml:"address"`
Port int `json:"port" yaml:"port"`
}
PeerConfig represents a cluster peer.
type QuorumStatusEvent ¶
type QuorumStatusEvent struct {
HasQuorum bool `json:"has_quorum"`
TotalNodes int `json:"total_nodes"`
HealthyNodes int `json:"healthy_nodes"`
RequiredForQuorum int `json:"required_for_quorum"`
}
QuorumStatusEvent contains data for quorum status events.
type RaftConfig ¶
type RaftConfig struct {
HeartbeatInterval time.Duration `default:"1s" json:"heartbeat_interval" yaml:"heartbeat_interval"`
ElectionTimeoutMin time.Duration `default:"5s" json:"election_timeout_min" yaml:"election_timeout_min"`
ElectionTimeoutMax time.Duration `default:"10s" json:"election_timeout_max" yaml:"election_timeout_max"`
SnapshotInterval time.Duration `default:"30m" json:"snapshot_interval" yaml:"snapshot_interval"`
SnapshotThreshold uint64 `default:"10000" json:"snapshot_threshold" yaml:"snapshot_threshold"`
LogCacheSize int `default:"1024" json:"log_cache_size" yaml:"log_cache_size"`
MaxAppendEntries int `default:"64" json:"max_append_entries" yaml:"max_append_entries"`
TrailingLogs uint64 `default:"10000" json:"trailing_logs" yaml:"trailing_logs"`
ReplicationBatchSize int `default:"100" json:"replication_batch_size" yaml:"replication_batch_size"`
LeaderLeaseTimeout time.Duration `default:"500ms" json:"leader_lease_timeout" yaml:"leader_lease_timeout"`
PreVote bool `default:"true" json:"pre_vote" yaml:"pre_vote"`
CheckQuorum bool `default:"true" json:"check_quorum" yaml:"check_quorum"`
DisablePipeline bool `default:"false" json:"disable_pipeline" yaml:"disable_pipeline"`
}
RaftConfig contains Raft-specific configuration.
type RaftNode ¶
type RaftNode interface {
// Start starts the Raft node
Start(ctx context.Context) error
// Stop stops the Raft node
Stop(ctx context.Context) error
// IsLeader returns true if this node is the leader
IsLeader() bool
// GetLeader returns the current leader ID
GetLeader() string
// GetTerm returns the current term
GetTerm() uint64
// GetRole returns the current role
GetRole() NodeRole
// Apply applies a log entry
Apply(ctx context.Context, entry LogEntry) error
// Propose proposes a new command to the cluster
Propose(ctx context.Context, command []byte) error
// GetCommitIndex returns the current commit index
GetCommitIndex() uint64
// AppendEntries handles AppendEntries RPC
AppendEntries(ctx context.Context, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
// RequestVote handles RequestVote RPC
RequestVote(ctx context.Context, req *RequestVoteRequest) (*RequestVoteResponse, error)
// InstallSnapshot handles InstallSnapshot RPC
InstallSnapshot(ctx context.Context, req *InstallSnapshotRequest) (*InstallSnapshotResponse, error)
// GetStats returns Raft statistics
GetStats() RaftStats
// AddPeer adds a peer to the Raft node
AddPeer(peerID string)
}
RaftNode defines the interface for Raft node operations.
type RaftStats ¶
type RaftStats struct {
NodeID string `json:"node_id"`
Role NodeRole `json:"role"`
Term uint64 `json:"term"`
Leader string `json:"leader"`
CommitIndex uint64 `json:"commit_index"`
LastApplied uint64 `json:"last_applied"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
VotedFor string `json:"voted_for"`
LastHeartbeat time.Time `json:"last_heartbeat"`
}
RaftStats represents Raft node statistics.
type RequestVoteRequest ¶
type RequestVoteRequest struct {
Term uint64 `json:"term"`
CandidateID string `json:"candidate_id"`
LastLogIndex uint64 `json:"last_log_index"`
LastLogTerm uint64 `json:"last_log_term"`
PreVote bool `json:"pre_vote"`
}
RequestVoteRequest represents a RequestVote RPC request.
type RequestVoteResponse ¶
type RequestVoteResponse struct {
Term uint64 `json:"term"`
VoteGranted bool `json:"vote_granted"`
NodeID string `json:"node_id"`
}
RequestVoteResponse represents a RequestVote RPC response.
type ResilienceConfig ¶
type ResilienceConfig struct {
EnableRetry bool `default:"true" json:"enable_retry" yaml:"enable_retry"`
MaxRetries int `default:"3" json:"max_retries" yaml:"max_retries"`
RetryDelay time.Duration `default:"100ms" json:"retry_delay" yaml:"retry_delay"`
RetryBackoffFactor float64 `default:"2.0" json:"retry_backoff_factor" yaml:"retry_backoff_factor"`
MaxRetryDelay time.Duration `default:"5s" json:"max_retry_delay" yaml:"max_retry_delay"`
EnableCircuitBreaker bool `default:"true" json:"enable_circuit_breaker" yaml:"enable_circuit_breaker"`
CircuitBreakerThreshold int `default:"5" json:"circuit_breaker_threshold" yaml:"circuit_breaker_threshold"`
CircuitBreakerTimeout time.Duration `default:"30s" json:"circuit_breaker_timeout" yaml:"circuit_breaker_timeout"`
}
ResilienceConfig contains resilience configuration.
type RoleChangedEvent ¶
type RoleChangedEvent struct {
OldRole string `json:"old_role"`
NewRole string `json:"new_role"`
Term uint64 `json:"term"`
Reason string `json:"reason,omitempty"`
}
RoleChangedEvent contains data for role change events.
type SecurityConfig ¶
type SecurityConfig struct {
EnableTLS bool `default:"false" json:"enable_tls" yaml:"enable_tls"`
EnableMTLS bool `default:"false" json:"enable_mtls" yaml:"enable_mtls"`
CertFile string `json:"cert_file" yaml:"cert_file"`
KeyFile string `json:"key_file" yaml:"key_file"`
CAFile string `json:"ca_file" yaml:"ca_file"`
SkipVerify bool `default:"false" json:"skip_verify" yaml:"skip_verify"`
EnableEncryption bool `default:"false" json:"enable_encryption" yaml:"enable_encryption"`
EncryptionKey string `json:"encryption_key" yaml:"encryption_key"`
}
SecurityConfig contains security configuration.
type Snapshot ¶
type Snapshot struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Data []byte `json:"data"`
Size int64 `json:"size"`
Created time.Time `json:"created"`
Checksum string `json:"checksum"`
}
Snapshot represents a point-in-time snapshot of the state machine.
type SnapshotEvent ¶
type SnapshotEvent struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Size int64 `json:"size"`
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
}
SnapshotEvent contains data for snapshot events.
type SnapshotMetadata ¶
type SnapshotMetadata struct {
Index uint64 `json:"index"`
Term uint64 `json:"term"`
Size int64 `json:"size"`
Created time.Time `json:"created"`
Checksum string `json:"checksum"`
}
SnapshotMetadata contains metadata about a snapshot.
type StateMachine ¶
type StateMachine interface {
// Apply applies a log entry to the state machine
Apply(entry LogEntry) error
// Snapshot creates a snapshot of the current state
Snapshot() (*Snapshot, error)
// Restore restores the state machine from a snapshot
Restore(snapshot *Snapshot) error
// Query performs a read-only query
Query(query any) (any, error)
}
StateMachine defines the interface for the replicated state machine.
type Storage ¶
type Storage interface {
// Start starts the storage backend
Start(ctx context.Context) error
// Stop stops the storage backend
Stop(ctx context.Context) error
// Set stores a key-value pair
Set(key, value []byte) error
// Get retrieves a value by key
Get(key []byte) ([]byte, error)
// Delete deletes a key
Delete(key []byte) error
// Batch executes a batch of operations
Batch(ops []BatchOp) error
// GetRange retrieves a range of key-value pairs
GetRange(start, end []byte) ([]KeyValue, error)
// Close closes the storage backend
Close() error
}
Storage defines the interface for persistent storage.
type StorageConfig ¶
type StorageConfig struct {
Type string `default:"badger" json:"type" yaml:"type"` // badger, boltdb, pebble, postgres
Path string `default:"./data/consensus" json:"path" yaml:"path"`
SyncWrites bool `default:"true" json:"sync_writes" yaml:"sync_writes"`
MaxBatchSize int `default:"1000" json:"max_batch_size" yaml:"max_batch_size"`
MaxBatchDelay time.Duration `default:"10ms" json:"max_batch_delay" yaml:"max_batch_delay"`
CompactOnStart bool `default:"false" json:"compact_on_start" yaml:"compact_on_start"`
// BadgerDB specific
BadgerOptions BadgerOptions `json:"badger_options" yaml:"badger_options"`
// BoltDB specific
BoltOptions BoltOptions `json:"bolt_options" yaml:"bolt_options"`
}
StorageConfig contains storage backend configuration.
type TracingConfig ¶
type TracingConfig struct {
Enabled bool `default:"false" json:"enabled" yaml:"enabled"`
ServiceName string `default:"forge-consensus" json:"service_name" yaml:"service_name"`
SampleRate float64 `default:"0.1" json:"sample_rate" yaml:"sample_rate"`
}
TracingConfig contains tracing configuration.
type Transport ¶
type Transport interface {
// Start starts the transport
Start(ctx context.Context) error
// Stop stops the transport
Stop(ctx context.Context) error
// Send sends a message to a peer
Send(ctx context.Context, target string, message any) error
// Receive returns a channel for receiving messages
Receive() <-chan Message
// AddPeer adds a peer
AddPeer(nodeID, address string, port int) error
// RemovePeer removes a peer
RemovePeer(nodeID string) error
// GetAddress returns the local address
GetAddress() string
}
Transport defines the interface for network transport.
type TransportConfig ¶
type TransportConfig struct {
Type string `default:"grpc" json:"type" yaml:"type"` // grpc, tcp
MaxMessageSize int `default:"4194304" json:"max_message_size" yaml:"max_message_size"` // 4MB
Timeout time.Duration `default:"10s" json:"timeout" yaml:"timeout"`
KeepAlive bool `default:"true" json:"keep_alive" yaml:"keep_alive"`
KeepAliveInterval time.Duration `default:"30s" json:"keep_alive_interval" yaml:"keep_alive_interval"`
KeepAliveTimeout time.Duration `default:"10s" json:"keep_alive_timeout" yaml:"keep_alive_timeout"`
MaxConnections int `default:"100" json:"max_connections" yaml:"max_connections"`
ConnectionTimeout time.Duration `default:"5s" json:"connection_timeout" yaml:"connection_timeout"`
IdleTimeout time.Duration `default:"5m" json:"idle_timeout" yaml:"idle_timeout"`
EnableCompression bool `default:"true" json:"enable_compression" yaml:"enable_compression"`
CompressionLevel int `default:"6" json:"compression_level" yaml:"compression_level"`
EnableMultiplexing bool `default:"true" json:"enable_multiplexing" yaml:"enable_multiplexing"`
}
TransportConfig contains transport layer configuration.