Documentation
¶
Index ¶
- Constants
- Variables
- func ValidRole(role string) bool
- type CompactionBridge
- func (b *CompactionBridge) BatchFileOps(ctx context.Context, registers []compaction.CompactedFile, ...) error
- func (b *CompactionBridge) DeleteCompactedSource(ctx context.Context, path, reason string) error
- func (b *CompactionBridge) RegisterCompactedFile(ctx context.Context, file compaction.CompactedFile) error
- type CompactorFailoverConfig
- type CompactorFailoverManager
- func (m *CompactorFailoverManager) SetCallbacks(onStart func(oldCompactorID, newCompactorID string), ...)
- func (m *CompactorFailoverManager) Start(ctx context.Context) error
- func (m *CompactorFailoverManager) Stats() map[string]interface{}
- func (m *CompactorFailoverManager) Stop() error
- func (m *CompactorFailoverManager) TriggerManualFailover() error
- type Coordinator
- func (c *Coordinator) AcceptReplicationConnection(conn net.Conn, syncReq *replication.ReplicateSync) error
- func (c *Coordinator) AddNodeViaRaft(node *Node) error
- func (c *Coordinator) BatchFileOpsInManifest(ops []raft.BatchFileOp) error
- func (c *Coordinator) Close() error
- func (c *Coordinator) DeleteFileFromManifest(path, reason string) error
- func (c *Coordinator) GetActiveCompactorID() string
- func (c *Coordinator) GetCapabilities() RoleCapabilities
- func (c *Coordinator) GetFileEntry(path string) (*raft.FileEntry, bool)
- func (c *Coordinator) GetFileManifest() []*raft.FileEntry
- func (c *Coordinator) GetFileManifestByDatabase(database string) []*raft.FileEntry
- func (c *Coordinator) GetHealthChecker() *HealthChecker
- func (c *Coordinator) GetLocalNode() *Node
- func (c *Coordinator) GetRaftNode() *raft.Node
- func (c *Coordinator) GetRegistry() *Registry
- func (c *Coordinator) GetReplicationStats() map[string]interface{}
- func (c *Coordinator) GetRole() NodeRole
- func (c *Coordinator) GetRouter() *Router
- func (c *Coordinator) IsActiveCompactor() bool
- func (c *Coordinator) IsLeader() bool
- func (c *Coordinator) IsPrimaryWriter() bool
- func (c *Coordinator) IsRunning() bool
- func (c *Coordinator) LeaderAddr() string
- func (c *Coordinator) LocalNodeID() string
- func (c *Coordinator) RegisterFileInManifest(file raft.FileEntry) error
- func (c *Coordinator) RemoveNodeViaRaft(nodeID string) error
- func (c *Coordinator) ReplicationCatchUpStatus() map[string]int64
- func (c *Coordinator) ReplicationReady() bool
- func (c *Coordinator) Role() string
- func (c *Coordinator) SetCompactorCallbacks(onBecome, onLose func())
- func (c *Coordinator) SetIngestBuffer(buffer *ingest.ArrowBuffer)
- func (c *Coordinator) SetStorageBackend(backend storage.Backend)
- func (c *Coordinator) SetWAL(walWriter *wal.Writer)
- func (c *Coordinator) Start() error
- func (c *Coordinator) StartReplication() error
- func (c *Coordinator) Status() map[string]interface{}
- func (c *Coordinator) Stop() error
- func (c *Coordinator) StopReplication()
- func (c *Coordinator) UpdateFileInManifest(file raft.FileEntry) error
- func (c *Coordinator) UpdateNodeStateViaRaft(nodeID string, state NodeState) error
- type CoordinatorConfig
- type CoordinatorFileRegistrar
- func (r *CoordinatorFileRegistrar) RegisterFile(database, measurement, path string, partitionTime time.Time, sizeBytes int64, ...)
- func (r *CoordinatorFileRegistrar) Start(parentCtx context.Context)
- func (r *CoordinatorFileRegistrar) Stats() map[string]int64
- func (r *CoordinatorFileRegistrar) Stop()
- type HealthChecker
- func (h *HealthChecker) CheckNow(nodeID string) bool
- func (h *HealthChecker) GetCheckInterval() time.Duration
- func (h *HealthChecker) GetUnhealthyThreshold() int
- func (h *HealthChecker) IsRunning() bool
- func (h *HealthChecker) SetRaftFSM(fsm *raft.ClusterFSM)
- func (h *HealthChecker) Start()
- func (h *HealthChecker) Status() map[string]interface{}
- func (h *HealthChecker) Stop()
- type HealthCheckerConfig
- type LoadBalanceStrategy
- type Node
- func (n *Node) Clone() *Node
- func (n *Node) GetCapabilities() RoleCapabilities
- func (n *Node) GetFailedChecks() int
- func (n *Node) GetLastHeartbeat() time.Time
- func (n *Node) GetState() NodeState
- func (n *Node) GetStats() NodeStats
- func (n *Node) GetWriterState() WriterState
- func (n *Node) IsAvailable() bool
- func (n *Node) IsHealthy() bool
- func (n *Node) IsPrimaryWriter() bool
- func (n *Node) MarkJoined()
- func (n *Node) ProcessHealthCheckResult(healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition
- func (n *Node) RecordFailedCheck() int
- func (n *Node) RecordHeartbeat(stats NodeStats)
- func (n *Node) SetAddresses(coordinatorAddr, apiAddr string)
- func (n *Node) SetVersion(version string)
- func (n *Node) SetWriterState(state WriterState)
- func (n *Node) UpdateState(state NodeState)
- type NodeEventCallback
- type NodeRole
- type NodeState
- type NodeStats
- type Registry
- func (r *Registry) Count() int
- func (r *Registry) CountByRole(role NodeRole) int
- func (r *Registry) CountHealthy() int
- func (r *Registry) Get(nodeID string) (*Node, bool)
- func (r *Registry) GetAll() []*Node
- func (r *Registry) GetAvailable() []*Node
- func (r *Registry) GetByRole(role NodeRole) []*Node
- func (r *Registry) GetByState(state NodeState) []*Node
- func (r *Registry) GetCompactors() []*Node
- func (r *Registry) GetHealthy() []*Node
- func (r *Registry) GetLastHeartbeat(nodeID string) time.Time
- func (r *Registry) GetPrimaryWriter() *Node
- func (r *Registry) GetReaders() []*Node
- func (r *Registry) GetStandbyWriters() []*Node
- func (r *Registry) GetWriters() []*Node
- func (r *Registry) Local() *Node
- func (r *Registry) NotifyHealthy(node *Node)
- func (r *Registry) NotifyUnhealthy(node *Node)
- func (r *Registry) ProcessHealthCheck(nodeID string, healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition
- func (r *Registry) RecordHeartbeat(nodeID string, stats NodeStats) bool
- func (r *Registry) Register(node *Node) error
- func (r *Registry) SetCallbacks(onJoined NodeEventCallback, onLeft NodeEventCallback, ...)
- func (r *Registry) Summary() map[string]int
- func (r *Registry) Unregister(nodeID string)
- func (r *Registry) UpdateNodeState(nodeID string, state NodeState)
- type RegistryConfig
- type RoleCapabilities
- type Router
- func (r *Router) CanRouteLocally(isWrite bool) bool
- func (r *Router) GetActiveConnections(nodeID string) int64
- func (r *Router) RouteQuery(ctx context.Context, req *http.Request) (*http.Response, error)
- func (r *Router) RouteWrite(ctx context.Context, req *http.Request) (*http.Response, error)
- func (r *Router) Stats() map[string]interface{}
- type RouterConfig
- type StateTransition
- type WriterFailoverConfig
- type WriterFailoverManager
- func (m *WriterFailoverManager) HandleWriterUnhealthy(node *Node)
- func (m *WriterFailoverManager) SetCallbacks(onStart func(oldPrimaryID, newPrimaryID string), ...)
- func (m *WriterFailoverManager) Start(ctx context.Context) error
- func (m *WriterFailoverManager) Stats() map[string]interface{}
- func (m *WriterFailoverManager) Stop() error
- func (m *WriterFailoverManager) TriggerManualFailover() error
- type WriterState
Constants ¶
const DefaultCallbackTimeout = 5 * time.Second
DefaultCallbackTimeout is the default timeout for callback execution.
const DefaultMaxNodes = 1000
DefaultMaxNodes is the default maximum number of nodes allowed in the registry.
Variables ¶
var ( // ErrLicenseRequired indicates that an enterprise license is required for clustering. ErrLicenseRequired = errors.New("enterprise license required for clustering") // ErrClusteringFeatureRequired indicates the license doesn't include the clustering feature. ErrClusteringFeatureRequired = errors.New("license does not include clustering feature") // ErrInvalidRole indicates an invalid node role was specified. ErrInvalidRole = errors.New("invalid cluster role") // ErrAlreadyRunning indicates the coordinator is already running. ErrAlreadyRunning = errors.New("cluster coordinator already running") // ErrNotRunning indicates the coordinator is not running. ErrNotRunning = errors.New("cluster coordinator not running") // ErrNodeNotFound indicates the requested node was not found in the registry. ErrNodeNotFound = errors.New("node not found") // ErrNodeAlreadyExists indicates a node with the same ID already exists. ErrNodeAlreadyExists = errors.New("node already exists") // ErrClusterNotEnabled indicates clustering is not enabled in configuration. ErrClusterNotEnabled = errors.New("clustering is not enabled") // ErrIngestNotAllowed indicates this node role cannot accept writes. ErrIngestNotAllowed = errors.New("this node role does not accept writes") // ErrQueryNotAllowed indicates this node role cannot execute queries. ErrQueryNotAllowed = errors.New("this node role does not execute queries") // ErrCompactionNotAllowed indicates this node role cannot run compaction. ErrCompactionNotAllowed = errors.New("this node role does not run compaction") // ErrTooManyNodes indicates the registry has reached its maximum node capacity. ErrTooManyNodes = errors.New("maximum number of cluster nodes reached") // ErrCoreLimitExceeded indicates adding this node would exceed the licensed core limit. ErrCoreLimitExceeded = errors.New("cluster core limit exceeded") )
Cluster-specific errors.
var ( // ErrNoWriterAvailable indicates no healthy writer node is available. ErrNoWriterAvailable = fmt.Errorf("no healthy writer node available") // ErrNoReaderAvailable indicates no healthy reader node is available. ErrNoReaderAvailable = fmt.Errorf("no healthy reader node available") // ErrRoutingFailed indicates the request could not be routed after all retries. ErrRoutingFailed = fmt.Errorf("routing failed after all retries") // ErrLocalNodeCanHandle indicates the local node can handle this request directly. ErrLocalNodeCanHandle = fmt.Errorf("local node can handle request") // ErrNotCoordinator indicates this node is not the cluster coordinator. ErrNotCoordinator = fmt.Errorf("this node is not the cluster coordinator") )
Router errors.
var ErrLeaderUnreachable = errors.New("forward apply: leader address not in registry")
ErrLeaderUnreachable is returned when the leader's ID is known but the registry doesn't have a coordinator address for it (e.g. the leader just left the registry mid-flight). Caller should retry.
var ErrNoLeaderKnown = errors.New("forward apply: no leader currently known")
ErrNoLeaderKnown is returned by forwardApplyToLeader when the local Raft node hasn't observed a leader yet (e.g. during election or right after startup before the first heartbeat). Callers should treat this as a transient retry condition.
Functions ¶
Types ¶
type CompactionBridge ¶
type CompactionBridge struct {
// contains filtered or unexported fields
}
CompactionBridge adapts a bridgeCoordinator to compaction.ManifestBridge.
func NewCompactionBridge ¶
func NewCompactionBridge(coord bridgeCoordinator) *CompactionBridge
NewCompactionBridge constructs a bridge bound to the given coordinator.
Panics if coord is nil — construction-time failure is louder than a runtime nil-check and makes the invariant visible at the call site. Production wiring in main.go always passes a non-nil coordinator.
func (*CompactionBridge) BatchFileOps ¶
func (b *CompactionBridge) BatchFileOps(ctx context.Context, registers []compaction.CompactedFile, deletes []compaction.DeleteSourceOp) error
BatchFileOps groups all register and delete operations for one compaction manifest into a single Raft log entry via the underlying coordinator. Reduces Raft traffic from O(N) applies to 1 per manifest.
func (*CompactionBridge) DeleteCompactedSource ¶
func (b *CompactionBridge) DeleteCompactedSource(ctx context.Context, path, reason string) error
DeleteCompactedSource appends a CommandDeleteFile to the Raft log via the underlying coordinator. Same forwarding semantics as RegisterCompactedFile: leader applies directly, non-leader forwards.
reason is a human-readable string for operator debugging; Phase 4 always passes "compaction:<job_id>" so operators can correlate deletions with the compaction job that drove them.
func (*CompactionBridge) RegisterCompactedFile ¶
func (b *CompactionBridge) RegisterCompactedFile(ctx context.Context, file compaction.CompactedFile) error
RegisterCompactedFile appends a CommandRegisterFile to the Raft log via the underlying coordinator. On the leader the command is applied directly; on a non-leader the coordinator forwards over the peer protocol to the current leader (see forward_apply.go). Either way the caller observes a successful return only after Raft has committed.
The bridge translates the cluster-agnostic compaction.CompactedFile shape into raft.FileEntry, stamping OriginNodeID with the local node ID so Phase 2/3 readers know to pull the bytes from this compactor.
type CompactorFailoverConfig ¶
type CompactorFailoverConfig struct {
// Registry provides access to cluster node state.
Registry *Registry
// RaftNode for applying AssignCompactor commands via consensus.
RaftNode *arcRaft.Node
// RaftFSM to read the current activeCompactorID.
RaftFSM *arcRaft.ClusterFSM
// CheckInterval is how often the leader checks compactor health.
// Default: 10s (compaction is less latency-sensitive than writes).
CheckInterval time.Duration
// FailoverTimeout bounds the Raft Apply for AssignCompactor.
FailoverTimeout time.Duration
// CooldownPeriod prevents repeated failovers within this window.
CooldownPeriod time.Duration
// UnhealthyThreshold is consecutive unhealthy checks before triggering
// failover. At 10s interval, threshold=3 means ~30s detection time.
UnhealthyThreshold int
Logger zerolog.Logger
}
CompactorFailoverConfig holds configuration for the compactor failover manager.
type CompactorFailoverManager ¶
type CompactorFailoverManager struct {
// contains filtered or unexported fields
}
CompactorFailoverManager monitors compactor health and reassigns the compactor lease on failure. Only active on the Raft leader.
func NewCompactorFailoverManager ¶
func NewCompactorFailoverManager(cfg *CompactorFailoverConfig) *CompactorFailoverManager
NewCompactorFailoverManager creates a new compactor failover manager.
func (*CompactorFailoverManager) SetCallbacks ¶
func (m *CompactorFailoverManager) SetCallbacks( onStart func(oldCompactorID, newCompactorID string), onComplete func(newCompactorID string, success bool), )
SetCallbacks sets callbacks for failover events.
func (*CompactorFailoverManager) Start ¶
func (m *CompactorFailoverManager) Start(ctx context.Context) error
Start begins the compactor failover manager.
func (*CompactorFailoverManager) Stats ¶
func (m *CompactorFailoverManager) Stats() map[string]interface{}
Stats returns failover manager statistics.
func (*CompactorFailoverManager) Stop ¶
func (m *CompactorFailoverManager) Stop() error
Stop gracefully shuts down the compactor failover manager.
func (*CompactorFailoverManager) TriggerManualFailover ¶
func (m *CompactorFailoverManager) TriggerManualFailover() error
TriggerManualFailover triggers a manual compactor failover.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(cfg *CoordinatorConfig) (*Coordinator, error)
NewCoordinator creates a new cluster coordinator. Returns an error if the license is invalid or missing the clustering feature.
func (*Coordinator) AcceptReplicationConnection ¶
func (c *Coordinator) AcceptReplicationConnection(conn net.Conn, syncReq *replication.ReplicateSync) error
AcceptReplicationConnection handles a replication connection from a reader. This is called when a reader sends a MsgReplicateSync message.
func (*Coordinator) AddNodeViaRaft ¶
func (c *Coordinator) AddNodeViaRaft(node *Node) error
AddNodeViaRaft adds a node to the cluster via Raft consensus. Must be called on the leader.
func (*Coordinator) BatchFileOpsInManifest ¶
func (c *Coordinator) BatchFileOpsInManifest(ops []raft.BatchFileOp) error
BatchFileOpsInManifest applies a batch of RegisterFile and DeleteFile operations as a single Raft log entry. On the leader the command is applied directly; on a non-leader it is forwarded to the current leader via the peer protocol. This reduces Raft traffic for compaction manifests from O(N) log entries to 1.
func (*Coordinator) Close ¶
func (c *Coordinator) Close() error
Close implements the shutdown.Shutdownable interface.
func (*Coordinator) DeleteFileFromManifest ¶
func (c *Coordinator) DeleteFileFromManifest(path, reason string) error
DeleteFileFromManifest removes a file from the cluster-wide manifest. Called by retention and compaction cleanup.
Phase 4: same leader-forwarding semantics as RegisterFileInManifest. Non-leader callers no longer silently drop the command.
func (*Coordinator) GetActiveCompactorID ¶
func (c *Coordinator) GetActiveCompactorID() string
GetActiveCompactorID returns the node ID currently holding the compactor lease.
func (*Coordinator) GetCapabilities ¶
func (c *Coordinator) GetCapabilities() RoleCapabilities
GetCapabilities returns the capabilities of the local node.
func (*Coordinator) GetFileEntry ¶
func (c *Coordinator) GetFileEntry(path string) (*raft.FileEntry, bool)
GetFileEntry returns the manifest entry for a given relative path. Returns false if the file is not in the manifest (standalone mode or pre-cluster file).
func (*Coordinator) GetFileManifest ¶
func (c *Coordinator) GetFileManifest() []*raft.FileEntry
GetFileManifest returns the current file manifest from the Raft FSM. Returns nil if Raft is not initialized.
func (*Coordinator) GetFileManifestByDatabase ¶
func (c *Coordinator) GetFileManifestByDatabase(database string) []*raft.FileEntry
GetFileManifestByDatabase returns files for a specific database.
func (*Coordinator) GetHealthChecker ¶
func (c *Coordinator) GetHealthChecker() *HealthChecker
GetHealthChecker returns the health checker.
func (*Coordinator) GetLocalNode ¶
func (c *Coordinator) GetLocalNode() *Node
GetLocalNode returns the local node.
func (*Coordinator) GetRaftNode ¶
func (c *Coordinator) GetRaftNode() *raft.Node
GetRaftNode returns the Raft node (may be nil if Raft is not configured).
func (*Coordinator) GetRegistry ¶
func (c *Coordinator) GetRegistry() *Registry
GetRegistry returns the node registry.
func (*Coordinator) GetReplicationStats ¶
func (c *Coordinator) GetReplicationStats() map[string]interface{}
GetReplicationStats returns replication statistics.
func (*Coordinator) GetRole ¶
func (c *Coordinator) GetRole() NodeRole
GetRole returns the role of the local node.
func (*Coordinator) GetRouter ¶
func (c *Coordinator) GetRouter() *Router
GetRouter returns the request router.
func (*Coordinator) IsActiveCompactor ¶
func (c *Coordinator) IsActiveCompactor() bool
IsActiveCompactor returns true if this node currently holds the compactor lease.
func (*Coordinator) IsLeader ¶
func (c *Coordinator) IsLeader() bool
IsLeader returns true if this node is the Raft leader. Always returns true if Raft is not configured (standalone mode).
func (*Coordinator) IsPrimaryWriter ¶
func (c *Coordinator) IsPrimaryWriter() bool
IsPrimaryWriter implements api.RetentionCoordinator, api.DeleteCoordinator, api.CQCoordinator, and scheduler.WriterGate: reports whether this node is the primary writer and may execute writer-only mutations. When failover is disabled there is no promoted primary — any writer node is authoritative, so we fall back to a role check.
func (*Coordinator) IsRunning ¶
func (c *Coordinator) IsRunning() bool
IsRunning returns true if the coordinator is running.
func (*Coordinator) LeaderAddr ¶
func (c *Coordinator) LeaderAddr() string
LeaderAddr returns the address of the current Raft leader. Returns empty string if Raft is not configured.
func (*Coordinator) LocalNodeID ¶
func (c *Coordinator) LocalNodeID() string
LocalNodeID returns the local cluster node ID. Phase 4 uses this via the CompactionBridge to set OriginNodeID on compacted-file Raft entries so Phase 2/3's multi-peer resolver routes replica pulls back to the compactor that produced the output.
func (*Coordinator) RegisterFileInManifest ¶
func (c *Coordinator) RegisterFileInManifest(file raft.FileEntry) error
RegisterFileInManifest appends a file entry to the cluster-wide manifest via Raft. Returns nil if Raft is not initialized (standalone mode).
Phase 4: when the local node is not the Raft leader, the command is forwarded to the current leader over the peer protocol instead of being silently dropped. Prior to Phase 4 this method returned nil on non-leader, which was a latent data-loss bug — writers that were not the leader would silently lose all their file registrations and the manifest would diverge from storage.
On forwarding failure (no leader known, leader unreachable, or the leader rejects the apply), the error is returned and the caller can retry. The forwarding path is bounded by forwardApplyTimeout so a stuck leader doesn't block the writer flush hot path indefinitely.
func (*Coordinator) RemoveNodeViaRaft ¶
func (c *Coordinator) RemoveNodeViaRaft(nodeID string) error
RemoveNodeViaRaft removes a node from the cluster via Raft consensus. It removes the node from both the Raft voting configuration and the cluster FSM state, then unregisters it from the local registry. Must be called on the leader.
func (*Coordinator) ReplicationCatchUpStatus ¶
func (c *Coordinator) ReplicationCatchUpStatus() map[string]int64
ReplicationCatchUpStatus returns the puller's catch-up-specific stats as a JSON-serializable map, or nil when the puller is not running. Intended for /api/v1/cluster/status. The keys mirror what Phase 5 will hard-gate the query path on.
func (*Coordinator) ReplicationReady ¶
func (c *Coordinator) ReplicationReady() bool
ReplicationReady reports whether the Phase 3 catch-up walker has finished its pass over the cluster manifest. Not currently consumed — Phase 5 will use this to hard-gate the query path during startup so readers can't return eventually-consistent results during catch-up. Lands now so the coordinator surface doesn't need another change in Phase 5.
func (*Coordinator) Role ¶
func (c *Coordinator) Role() string
Role implements api.RetentionCoordinator: returns a human-readable role string for log messages.
func (*Coordinator) SetCompactorCallbacks ¶
func (c *Coordinator) SetCompactorCallbacks(onBecome, onLose func())
SetCompactorCallbacks sets the callbacks for dynamic compaction activation. Called from main.go before Start() so the FSM callback has hooks to invoke.
func (*Coordinator) SetIngestBuffer ¶
func (c *Coordinator) SetIngestBuffer(buffer *ingest.ArrowBuffer)
SetIngestBuffer sets the ArrowBuffer for reader nodes to apply replicated entries. This enables query freshness — readers can query unflushed writer data that arrives via WAL replication.
func (*Coordinator) SetStorageBackend ¶
func (c *Coordinator) SetStorageBackend(backend storage.Backend)
SetStorageBackend sets the local storage backend reference. It is required for Enterprise peer replication Phase 2: the fetch handler reads local file bytes via this backend to stream them to pulling peers, and the puller writes received bytes into it. Must be called before Start when peer replication is enabled.
func (*Coordinator) SetWAL ¶
func (c *Coordinator) SetWAL(walWriter *wal.Writer)
SetWAL sets the WAL writer reference for replication. This should be called after the WAL is created but before Start().
func (*Coordinator) Start ¶
func (c *Coordinator) Start() error
Start starts the cluster coordinator.
func (*Coordinator) StartReplication ¶
func (c *Coordinator) StartReplication() error
StartReplication starts WAL replication based on node role. - Writers start a Sender to stream entries to readers - Readers start a Receiver to receive entries from the writer This should be called after Start() and SetWAL().
func (*Coordinator) Status ¶
func (c *Coordinator) Status() map[string]interface{}
Status returns the cluster status as a map for JSON serialization.
func (*Coordinator) Stop ¶
func (c *Coordinator) Stop() error
func (*Coordinator) StopReplication ¶
func (c *Coordinator) StopReplication()
StopReplication stops WAL replication.
func (*Coordinator) UpdateFileInManifest ¶
func (c *Coordinator) UpdateFileInManifest(file raft.FileEntry) error
UpdateFileInManifest updates an existing file's metadata in the cluster manifest after a partial rewrite that changes size/checksum but keeps the same path. Same leader-forwarding semantics as RegisterFileInManifest.
func (*Coordinator) UpdateNodeStateViaRaft ¶
func (c *Coordinator) UpdateNodeStateViaRaft(nodeID string, state NodeState) error
UpdateNodeStateViaRaft updates a node's state via Raft consensus. Must be called on the leader.
type CoordinatorConfig ¶
type CoordinatorConfig struct {
Config *config.ClusterConfig
LicenseClient *license.Client
Version string // Arc version
APIAddress string // HTTP API address for this node
Logger zerolog.Logger
// Phase 4: when true, the embedded HealthChecker surfaces rate-limited
// Warn logs when the cluster has zero or >1 nodes in RoleCompactor.
// Main wires this to cfg.Cluster.Enabled && cfg.Cluster.ReplicationEnabled &&
// cfg.Compaction.Enabled so the warning only fires in deployments where
// a missing compactor is actually a problem.
WarnIfNoCompactor bool
}
CoordinatorConfig holds configuration for the coordinator.
type CoordinatorFileRegistrar ¶
type CoordinatorFileRegistrar struct {
// contains filtered or unexported fields
}
CoordinatorFileRegistrar implements ingest.FileRegistrar by forwarding file registration events to the cluster coordinator's Raft manifest.
The registration is fully async: RegisterFile enqueues the entry and returns immediately. A background worker drains the queue and calls coordinator.RegisterFileInManifest. This ensures the flush hot path is never blocked on Raft consensus, network I/O, or checksum compute.
The SHA-256 checksum is computed by the flush path on the in-memory Parquet buffer before the backend write and passed to RegisterFile. Peers use it to verify files pulled during Phase 2 replication.
func NewCoordinatorFileRegistrar ¶
func NewCoordinatorFileRegistrar(coord *Coordinator, logger zerolog.Logger) *CoordinatorFileRegistrar
NewCoordinatorFileRegistrar creates a new registrar backed by the coordinator. The coordinator must be started before the registrar is used.
func (*CoordinatorFileRegistrar) RegisterFile ¶
func (r *CoordinatorFileRegistrar) RegisterFile(database, measurement, path string, partitionTime time.Time, sizeBytes int64, sha256 string)
RegisterFile implements ingest.FileRegistrar. Non-blocking: enqueues the registration and returns immediately. If the queue is full, the entry is dropped and a counter is incremented — peer replication will discover it on the next anti-entropy scan (Phase 3+).
sha256 is a hex-encoded SHA-256 of the Parquet file bytes. The caller (arrow_writer.go flush path) computes it on the in-memory buffer before the storage backend write, so it's effectively free.
func (*CoordinatorFileRegistrar) Start ¶
func (r *CoordinatorFileRegistrar) Start(parentCtx context.Context)
Start launches the background worker that drains the registration queue.
func (*CoordinatorFileRegistrar) Stats ¶
func (r *CoordinatorFileRegistrar) Stats() map[string]int64
Stats returns a snapshot of registrar metrics.
func (*CoordinatorFileRegistrar) Stop ¶
func (r *CoordinatorFileRegistrar) Stop()
Stop signals the worker to exit, drains any pending entries, and waits for the worker to finish. Best-effort drain: entries still in-flight after the drain timeout are discarded (they can be recovered by anti-entropy in a future phase).
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker performs periodic health checks on cluster nodes. It monitors node heartbeats and updates node state based on check results.
func NewHealthChecker ¶
func NewHealthChecker(cfg *HealthCheckerConfig) *HealthChecker
NewHealthChecker creates a new health checker.
func (*HealthChecker) CheckNow ¶
func (h *HealthChecker) CheckNow(nodeID string) bool
CheckNow performs an immediate health check on a specific node. This is useful for on-demand checks outside the regular interval.
func (*HealthChecker) GetCheckInterval ¶
func (h *HealthChecker) GetCheckInterval() time.Duration
GetCheckInterval returns the configured check interval.
func (*HealthChecker) GetUnhealthyThreshold ¶
func (h *HealthChecker) GetUnhealthyThreshold() int
GetUnhealthyThreshold returns the configured unhealthy threshold.
func (*HealthChecker) IsRunning ¶
func (h *HealthChecker) IsRunning() bool
IsRunning returns true if the health checker is running.
func (*HealthChecker) SetRaftFSM ¶
func (h *HealthChecker) SetRaftFSM(fsm *raft.ClusterFSM)
checkCompactorElected enforces the Phase 4 "exactly one compactor" invariant via rate-limited Warn logs. Two failure modes:
- Zero compactors: operator forgot to set ARC_CLUSTER_ROLE=compactor on any node. Compacted files will never be registered in the Raft manifest and the cluster will slowly accumulate small source files.
- Multiple compactors: operator set RoleCompactor on more than one node, re-introducing the shared-storage duplicate-output bug that Phase 4 is designed to prevent.
Both cases are non-fatal — Arc keeps running, queries still work, but the cluster is in a degraded state that requires operator attention. A Warn log (not Error) is the right level because the operator chose the configuration; we're flagging it, not failing on it.
Design note on rate limiting: we always walk the registry (cheap — it's an in-memory map) and decide the message FIRST, then apply the rate-limit to the LOG EMISSION only. This way, the "correct config" branch unconditionally resets both timers — so when the cluster transitions broken → correct → broken, the second "broken" warning fires immediately instead of waiting out the full minute from the first one.
The two failure modes have SEPARATE timers so a cluster flapping between 0 and 2 compactors surfaces both warnings within the same minute. Sharing a single timer would let whichever warning lost the CAS race silence the other for the whole interval — an operator watching a log-tail could see only "no compactor" while "multiple compactors" was the more recent condition. SetRaftFSM wires the Raft FSM so checkCompactorElected can check the Phase 5 active compactor lease. Called from coordinator wiring.
func (*HealthChecker) Start ¶
func (h *HealthChecker) Start()
Start starts the health checker background loop.
func (*HealthChecker) Status ¶
func (h *HealthChecker) Status() map[string]interface{}
Status returns the health checker status for monitoring.
type HealthCheckerConfig ¶
type HealthCheckerConfig struct {
Registry *Registry
CheckInterval time.Duration // How often to check nodes (default: 5s)
CheckTimeout time.Duration // Timeout for each check (default: 3s)
UnhealthyThreshold int // Failed checks before marking unhealthy (default: 3)
// Phase 4: when true, the health loop logs rate-limited Warn messages
// if zero or >1 nodes have RoleCompactor. Set from main.go based on
// cluster + replication + compaction config. Zero value (false) means
// OSS / standalone and the check is skipped.
WarnIfNoCompactor bool
Logger zerolog.Logger
}
HealthCheckerConfig holds configuration for the health checker.
type LoadBalanceStrategy ¶
type LoadBalanceStrategy string
LoadBalanceStrategy defines how to select nodes for routing.
const ( // LoadBalanceRoundRobin selects nodes in round-robin order. LoadBalanceRoundRobin LoadBalanceStrategy = "round_robin" // LoadBalanceLeastConnections selects the node with fewest active connections. LoadBalanceLeastConnections LoadBalanceStrategy = "least_connections" // LoadBalanceRandom selects a random healthy node. LoadBalanceRandom LoadBalanceStrategy = "random" )
type Node ¶
type Node struct {
// Identity
ID string `json:"id"` // Unique node identifier
Name string `json:"name"` // Human-readable name
Role NodeRole `json:"role"` // Node role (writer, reader, compactor, standalone)
ClusterName string `json:"cluster_name"` // Name of the cluster this node belongs to
// Network
Address string `json:"address"` // Coordinator address (host:port for inter-node communication)
APIAddress string `json:"api_address"` // HTTP API address (host:port for client requests)
// State
State NodeState `json:"state"` // Current health state
// Health tracking
LastHeartbeat time.Time `json:"last_heartbeat"` // Time of last successful heartbeat
LastHealthy time.Time `json:"last_healthy"` // Time node was last known healthy
FailedChecks int `json:"failed_checks"` // Consecutive failed health checks
// Metadata
Version string `json:"version"` // Arc version running on this node
StartedAt time.Time `json:"started_at"` // When the node process started
JoinedAt time.Time `json:"joined_at"` // When the node joined the cluster
// Writer state (primary/standby) - only relevant for writer nodes
WriterSt WriterState `json:"writer_state,omitempty"`
// Runtime stats (updated via heartbeat)
Stats NodeStats `json:"stats"`
// contains filtered or unexported fields
}
Node represents a node in the Arc cluster.
func (*Node) GetCapabilities ¶
func (n *Node) GetCapabilities() RoleCapabilities
GetCapabilities returns the capabilities for this node's role.
func (*Node) GetFailedChecks ¶
GetFailedChecks returns the number of consecutive failed health checks.
func (*Node) GetLastHeartbeat ¶
GetLastHeartbeat returns the time of the last successful heartbeat.
func (*Node) GetWriterState ¶
func (n *Node) GetWriterState() WriterState
GetWriterState returns the writer state of this node.
func (*Node) IsAvailable ¶
IsAvailable returns true if the node can accept requests. A node is available if it's healthy or just joining (warming up).
func (*Node) IsPrimaryWriter ¶
IsPrimaryWriter returns true if this node is the primary writer.
func (*Node) MarkJoined ¶
func (n *Node) MarkJoined()
MarkJoined marks the node as having joined the cluster.
func (*Node) ProcessHealthCheckResult ¶
func (n *Node) ProcessHealthCheckResult(healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition
ProcessHealthCheckResult atomically processes a health check result and returns any state transition that occurred. This prevents TOCTOU race conditions by performing the check and update under a single lock.
func (*Node) RecordFailedCheck ¶
RecordFailedCheck records a failed health check. Returns the new count of consecutive failed checks.
func (*Node) RecordHeartbeat ¶
RecordHeartbeat records a successful heartbeat from the node.
func (*Node) SetAddresses ¶
SetAddresses sets the node's network addresses.
func (*Node) SetVersion ¶
SetVersion sets the node's Arc version.
func (*Node) SetWriterState ¶
func (n *Node) SetWriterState(state WriterState)
SetWriterState sets the writer state of this node.
func (*Node) UpdateState ¶
UpdateState updates the node state.
type NodeEventCallback ¶
type NodeEventCallback func(*Node)
NodeEventCallback is called when node events occur.
type NodeRole ¶
type NodeRole string
NodeRole represents the role of a node in the cluster. Each role has specific capabilities that determine what operations the node can perform.
const ( // RoleStandalone is the default single-node deployment mode (OSS-compatible). // Can perform all operations: ingest, query, and compact. RoleStandalone NodeRole = "standalone" // RoleWriter handles ingestion, WAL, and flushes to shared storage. // Can also execute queries for monitoring and validation. RoleWriter NodeRole = "writer" // RoleReader is a query-only node that reads from shared storage. // Cannot ingest data or run compaction jobs. RoleReader NodeRole = "reader" // RoleCompactor runs background compaction and maintenance tasks. // Cannot ingest data or serve queries. RoleCompactor NodeRole = "compactor" )
func ParseRole ¶
ParseRole parses a string into a NodeRole. Returns RoleStandalone if the role is empty or invalid.
func (NodeRole) GetCapabilities ¶
func (r NodeRole) GetCapabilities() RoleCapabilities
GetCapabilities returns the capabilities for this role.
type NodeState ¶
type NodeState string
NodeState represents the health state of a node in the cluster.
const ( // StateUnknown is the initial state before health is determined. StateUnknown NodeState = "unknown" // StateHealthy indicates the node is operating normally. StateHealthy NodeState = "healthy" // StateUnhealthy indicates the node has failed health checks but may recover. StateUnhealthy NodeState = "unhealthy" // StateDead indicates the node has been unreachable for an extended period. StateDead NodeState = "dead" // StateJoining indicates the node is joining the cluster. StateJoining NodeState = "joining" // StateLeaving indicates the node is gracefully leaving the cluster. StateLeaving NodeState = "leaving" )
type NodeStats ¶
type NodeStats struct {
CPUUsage float64 `json:"cpu_usage"` // CPU usage percentage (0-100)
MemoryUsage float64 `json:"memory_usage"` // Memory usage percentage (0-100)
IngestRate int64 `json:"ingest_rate"` // Records ingested per second
QueryRate int64 `json:"query_rate"` // Queries executed per second
StorageUsed int64 `json:"storage_used"` // Bytes used in storage
Connections int `json:"connections"` // Active HTTP connections
ActiveQueries int `json:"active_queries"` // Currently running queries
CompactionJobs int `json:"compaction_jobs"` // Active compaction jobs (for compactor role)
}
NodeStats contains runtime statistics for a node. These are updated periodically via heartbeat.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages cluster node membership. It provides thread-safe access to the list of nodes in the cluster.
func NewRegistry ¶
func NewRegistry(cfg *RegistryConfig) *Registry
NewRegistry creates a new node registry.
func (*Registry) CountByRole ¶
CountByRole returns the number of nodes with a specific role.
func (*Registry) CountHealthy ¶
CountHealthy returns the number of healthy nodes.
func (*Registry) Get ¶
Get returns a clone of a node by ID. Returns a cloned node to prevent race conditions from concurrent modifications.
func (*Registry) GetAvailable ¶
GetAvailable returns cloned copies of all available nodes (healthy or joining).
func (*Registry) GetByState ¶
GetByState returns cloned nodes with a specific state.
func (*Registry) GetCompactors ¶
GetCompactors returns cloned copies of all healthy compactor nodes.
func (*Registry) GetHealthy ¶
GetHealthy returns cloned copies of all healthy nodes.
func (*Registry) GetLastHeartbeat ¶
GetLastHeartbeat returns the LastHeartbeat timestamp of the real node in the registry. Returns zero time if the node doesn't exist.
func (*Registry) GetPrimaryWriter ¶
GetPrimaryWriter returns the primary writer node, or nil if none exists.
func (*Registry) GetReaders ¶
GetReaders returns cloned copies of all healthy reader nodes.
func (*Registry) GetStandbyWriters ¶
GetStandbyWriters returns healthy standby writer nodes.
func (*Registry) GetWriters ¶
GetWriters returns cloned copies of all healthy writer nodes.
func (*Registry) NotifyHealthy ¶
NotifyHealthy notifies that a node became healthy.
func (*Registry) NotifyUnhealthy ¶
NotifyUnhealthy notifies that a node became unhealthy.
func (*Registry) ProcessHealthCheck ¶
func (r *Registry) ProcessHealthCheck(nodeID string, healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition
ProcessHealthCheck runs a health check result against the real node in the registry (not a clone). Returns the state transition if any occurred.
func (*Registry) RecordHeartbeat ¶
RecordHeartbeat updates the LastHeartbeat on the real node in the registry (not a clone). This is called from handleHeartbeat when a peer sends us a heartbeat message. Node.RecordHeartbeat is thread-safe via the node's internal mutex.
func (*Registry) Register ¶
Register adds or updates a node in the registry. Returns ErrTooManyNodes if the registry has reached its maximum capacity.
func (*Registry) SetCallbacks ¶
func (r *Registry) SetCallbacks( onJoined NodeEventCallback, onLeft NodeEventCallback, onHealthy NodeEventCallback, onUnhealthy NodeEventCallback, )
SetCallbacks sets event callbacks for node lifecycle events. Callbacks are invoked asynchronously to prevent blocking the registry.
func (*Registry) Unregister ¶
Unregister removes a node from the registry.
func (*Registry) UpdateNodeState ¶
UpdateNodeState updates the state of the real node in the registry. Called from handleHeartbeat to propagate the peer's self-reported state.
type RegistryConfig ¶
RegistryConfig holds configuration for the node registry.
type RoleCapabilities ¶
type RoleCapabilities struct {
CanIngest bool // Accept write requests (LineProtocol, MessagePack)
CanQuery bool // Execute SQL queries
CanCompact bool // Run compaction jobs
CanCoordinate bool // Participate in cluster coordination (leader election)
}
RoleCapabilities defines what operations each role can perform.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router routes requests to appropriate nodes in the cluster. The coordinator node uses this to forward writes to writers and queries to readers.
func (*Router) CanRouteLocally ¶
CanRouteLocally returns true if the local node can handle the given request type.
func (*Router) GetActiveConnections ¶
GetActiveConnections returns the number of active connections to a node.
func (*Router) RouteQuery ¶
RouteQuery routes a query request to an appropriate reader node. Returns ErrLocalNodeCanHandle if this node can process the query directly.
func (*Router) RouteWrite ¶
RouteWrite routes a write request to an appropriate writer node. Returns ErrLocalNodeCanHandle if this node can process the write directly.
type RouterConfig ¶
type RouterConfig struct {
// Timeout for forwarded requests
Timeout time.Duration
// Number of retries for failed forwards
Retries int
// Load balancing strategy
Strategy LoadBalanceStrategy
// Registry for looking up nodes
Registry *Registry
// LocalNode is this node's identity
LocalNode *Node
// Logger for routing events
Logger zerolog.Logger
}
RouterConfig holds configuration for the request router.
type StateTransition ¶
StateTransition represents a node state change.
type WriterFailoverConfig ¶
type WriterFailoverConfig struct {
// Registry provides access to cluster node state
Registry *Registry
// RaftNode for applying promote/demote commands via consensus
RaftNode *arcRaft.Node
// HealthCheckInterval is how often to check writer health
HealthCheckInterval time.Duration
// FailoverTimeout is the timeout for the entire failover operation
FailoverTimeout time.Duration
// CooldownPeriod prevents repeated failovers within this window
CooldownPeriod time.Duration
// UnhealthyThreshold is consecutive unhealthy checks before triggering failover
UnhealthyThreshold int
// Logger for failover events
Logger zerolog.Logger
}
WriterFailoverConfig holds configuration for the writer failover manager.
type WriterFailoverManager ¶
type WriterFailoverManager struct {
// contains filtered or unexported fields
}
WriterFailoverManager monitors writer health and promotes standby writers when the primary fails. Only the Raft leader runs active health checks.
func NewWriterFailoverManager ¶
func NewWriterFailoverManager(cfg *WriterFailoverConfig) *WriterFailoverManager
NewWriterFailoverManager creates a new writer failover manager.
func (*WriterFailoverManager) HandleWriterUnhealthy ¶
func (m *WriterFailoverManager) HandleWriterUnhealthy(node *Node)
HandleWriterUnhealthy is called when a writer node becomes unhealthy. This is invoked by the registry's onNodeUnhealthy callback.
func (*WriterFailoverManager) SetCallbacks ¶
func (m *WriterFailoverManager) SetCallbacks( onStart func(oldPrimaryID, newPrimaryID string), onComplete func(newPrimaryID string, success bool), )
SetCallbacks sets callbacks for failover events.
func (*WriterFailoverManager) Start ¶
func (m *WriterFailoverManager) Start(ctx context.Context) error
Start begins the writer failover manager.
func (*WriterFailoverManager) Stats ¶
func (m *WriterFailoverManager) Stats() map[string]interface{}
Stats returns failover manager statistics.
func (*WriterFailoverManager) Stop ¶
func (m *WriterFailoverManager) Stop() error
Stop gracefully shuts down the writer failover manager.
func (*WriterFailoverManager) TriggerManualFailover ¶
func (m *WriterFailoverManager) TriggerManualFailover() error
TriggerManualFailover triggers a manual failover from the current primary.
type WriterState ¶
type WriterState string
WriterState represents whether a writer node is primary or standby.
const ( // WriterStatePrimary indicates this writer is the active primary accepting writes. WriterStatePrimary WriterState = "primary" // WriterStateStandby indicates this writer is a hot standby ready for promotion. WriterStateStandby WriterState = "standby" // WriterStateNone indicates the node is not a writer or has no assigned writer state. WriterStateNone WriterState = "" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package filereplication implements peer-to-peer Parquet file replication for Arc Enterprise clusters without shared storage.
|
Package filereplication implements peer-to-peer Parquet file replication for Arc Enterprise clusters without shared storage. |