cluster

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCallbackTimeout = 5 * time.Second

DefaultCallbackTimeout is the default timeout for callback execution.

View Source
const DefaultMaxNodes = 1000

DefaultMaxNodes is the default maximum number of nodes allowed in the registry.

Variables

View Source
var (
	// ErrLicenseRequired indicates that an enterprise license is required for clustering.
	ErrLicenseRequired = errors.New("enterprise license required for clustering")

	// ErrClusteringFeatureRequired indicates the license doesn't include the clustering feature.
	ErrClusteringFeatureRequired = errors.New("license does not include clustering feature")

	// ErrInvalidRole indicates an invalid node role was specified.
	ErrInvalidRole = errors.New("invalid cluster role")

	// ErrAlreadyRunning indicates the coordinator is already running.
	ErrAlreadyRunning = errors.New("cluster coordinator already running")

	// ErrNotRunning indicates the coordinator is not running.
	ErrNotRunning = errors.New("cluster coordinator not running")

	// ErrNodeNotFound indicates the requested node was not found in the registry.
	ErrNodeNotFound = errors.New("node not found")

	// ErrNodeAlreadyExists indicates a node with the same ID already exists.
	ErrNodeAlreadyExists = errors.New("node already exists")

	// ErrClusterNotEnabled indicates clustering is not enabled in configuration.
	ErrClusterNotEnabled = errors.New("clustering is not enabled")

	// ErrIngestNotAllowed indicates this node role cannot accept writes.
	ErrIngestNotAllowed = errors.New("this node role does not accept writes")

	// ErrQueryNotAllowed indicates this node role cannot execute queries.
	ErrQueryNotAllowed = errors.New("this node role does not execute queries")

	// ErrCompactionNotAllowed indicates this node role cannot run compaction.
	ErrCompactionNotAllowed = errors.New("this node role does not run compaction")

	// ErrTooManyNodes indicates the registry has reached its maximum node capacity.
	ErrTooManyNodes = errors.New("maximum number of cluster nodes reached")

	// ErrCoreLimitExceeded indicates adding this node would exceed the licensed core limit.
	ErrCoreLimitExceeded = errors.New("cluster core limit exceeded")
)

Cluster-specific errors.

View Source
var (
	// ErrNoWriterAvailable indicates no healthy writer node is available.
	ErrNoWriterAvailable = fmt.Errorf("no healthy writer node available")

	// ErrNoReaderAvailable indicates no healthy reader node is available.
	ErrNoReaderAvailable = fmt.Errorf("no healthy reader node available")

	// ErrRoutingFailed indicates the request could not be routed after all retries.
	ErrRoutingFailed = fmt.Errorf("routing failed after all retries")

	// ErrLocalNodeCanHandle indicates the local node can handle this request directly.
	ErrLocalNodeCanHandle = fmt.Errorf("local node can handle request")

	// ErrNotCoordinator indicates this node is not the cluster coordinator.
	ErrNotCoordinator = fmt.Errorf("this node is not the cluster coordinator")
)

Router errors.

View Source
var ErrLeaderUnreachable = errors.New("forward apply: leader address not in registry")

ErrLeaderUnreachable is returned when the leader's ID is known but the registry doesn't have a coordinator address for it (e.g. the leader just left the registry mid-flight). Caller should retry.

View Source
var ErrNoLeaderKnown = errors.New("forward apply: no leader currently known")

ErrNoLeaderKnown is returned by forwardApplyToLeader when the local Raft node hasn't observed a leader yet (e.g. during election or right after startup before the first heartbeat). Callers should treat this as a transient retry condition.

Functions

func ValidRole

func ValidRole(role string) bool

ValidRole returns true if the role string represents a valid role.

Types

type CompactionBridge

type CompactionBridge struct {
	// contains filtered or unexported fields
}

CompactionBridge adapts a bridgeCoordinator to compaction.ManifestBridge.

func NewCompactionBridge

func NewCompactionBridge(coord bridgeCoordinator) *CompactionBridge

NewCompactionBridge constructs a bridge bound to the given coordinator.

Panics if coord is nil — construction-time failure is louder than a runtime nil-check and makes the invariant visible at the call site. Production wiring in main.go always passes a non-nil coordinator.

func (*CompactionBridge) BatchFileOps

func (b *CompactionBridge) BatchFileOps(ctx context.Context, registers []compaction.CompactedFile, deletes []compaction.DeleteSourceOp) error

BatchFileOps groups all register and delete operations for one compaction manifest into a single Raft log entry via the underlying coordinator. Reduces Raft traffic from O(N) applies to 1 per manifest.

func (*CompactionBridge) DeleteCompactedSource

func (b *CompactionBridge) DeleteCompactedSource(ctx context.Context, path, reason string) error

DeleteCompactedSource appends a CommandDeleteFile to the Raft log via the underlying coordinator. Same forwarding semantics as RegisterCompactedFile: leader applies directly, non-leader forwards.

reason is a human-readable string for operator debugging; Phase 4 always passes "compaction:<job_id>" so operators can correlate deletions with the compaction job that drove them.

func (*CompactionBridge) RegisterCompactedFile

func (b *CompactionBridge) RegisterCompactedFile(ctx context.Context, file compaction.CompactedFile) error

RegisterCompactedFile appends a CommandRegisterFile to the Raft log via the underlying coordinator. On the leader the command is applied directly; on a non-leader the coordinator forwards over the peer protocol to the current leader (see forward_apply.go). Either way the caller observes a successful return only after Raft has committed.

The bridge translates the cluster-agnostic compaction.CompactedFile shape into raft.FileEntry, stamping OriginNodeID with the local node ID so Phase 2/3 readers know to pull the bytes from this compactor.

type CompactorFailoverConfig

type CompactorFailoverConfig struct {
	// Registry provides access to cluster node state.
	Registry *Registry

	// RaftNode for applying AssignCompactor commands via consensus.
	RaftNode *arcRaft.Node

	// RaftFSM to read the current activeCompactorID.
	RaftFSM *arcRaft.ClusterFSM

	// CheckInterval is how often the leader checks compactor health.
	// Default: 10s (compaction is less latency-sensitive than writes).
	CheckInterval time.Duration

	// FailoverTimeout bounds the Raft Apply for AssignCompactor.
	FailoverTimeout time.Duration

	// CooldownPeriod prevents repeated failovers within this window.
	CooldownPeriod time.Duration

	// UnhealthyThreshold is consecutive unhealthy checks before triggering
	// failover. At 10s interval, threshold=3 means ~30s detection time.
	UnhealthyThreshold int

	Logger zerolog.Logger
}

CompactorFailoverConfig holds configuration for the compactor failover manager.

type CompactorFailoverManager

type CompactorFailoverManager struct {
	// contains filtered or unexported fields
}

CompactorFailoverManager monitors compactor health and reassigns the compactor lease on failure. Only active on the Raft leader.

func NewCompactorFailoverManager

func NewCompactorFailoverManager(cfg *CompactorFailoverConfig) *CompactorFailoverManager

NewCompactorFailoverManager creates a new compactor failover manager.

func (*CompactorFailoverManager) SetCallbacks

func (m *CompactorFailoverManager) SetCallbacks(
	onStart func(oldCompactorID, newCompactorID string),
	onComplete func(newCompactorID string, success bool),
)

SetCallbacks sets callbacks for failover events.

func (*CompactorFailoverManager) Start

Start begins the compactor failover manager.

func (*CompactorFailoverManager) Stats

func (m *CompactorFailoverManager) Stats() map[string]interface{}

Stats returns failover manager statistics.

func (*CompactorFailoverManager) Stop

func (m *CompactorFailoverManager) Stop() error

Stop gracefully shuts down the compactor failover manager.

func (*CompactorFailoverManager) TriggerManualFailover

func (m *CompactorFailoverManager) TriggerManualFailover() error

TriggerManualFailover triggers a manual compactor failover.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(cfg *CoordinatorConfig) (*Coordinator, error)

NewCoordinator creates a new cluster coordinator. Returns an error if the license is invalid or missing the clustering feature.

func (*Coordinator) AcceptReplicationConnection

func (c *Coordinator) AcceptReplicationConnection(conn net.Conn, syncReq *replication.ReplicateSync) error

AcceptReplicationConnection handles a replication connection from a reader. This is called when a reader sends a MsgReplicateSync message.

func (*Coordinator) AddNodeViaRaft

func (c *Coordinator) AddNodeViaRaft(node *Node) error

AddNodeViaRaft adds a node to the cluster via Raft consensus. Must be called on the leader.

func (*Coordinator) BatchFileOpsInManifest

func (c *Coordinator) BatchFileOpsInManifest(ops []raft.BatchFileOp) error

BatchFileOpsInManifest applies a batch of RegisterFile and DeleteFile operations as a single Raft log entry. On the leader the command is applied directly; on a non-leader it is forwarded to the current leader via the peer protocol. This reduces Raft traffic for compaction manifests from O(N) log entries to 1.

func (*Coordinator) Close

func (c *Coordinator) Close() error

Close implements the shutdown.Shutdownable interface.

func (*Coordinator) DeleteFileFromManifest

func (c *Coordinator) DeleteFileFromManifest(path, reason string) error

DeleteFileFromManifest removes a file from the cluster-wide manifest. Called by retention and compaction cleanup.

Phase 4: same leader-forwarding semantics as RegisterFileInManifest. Non-leader callers no longer silently drop the command.

func (*Coordinator) GetActiveCompactorID

func (c *Coordinator) GetActiveCompactorID() string

GetActiveCompactorID returns the node ID currently holding the compactor lease.

func (*Coordinator) GetCapabilities

func (c *Coordinator) GetCapabilities() RoleCapabilities

GetCapabilities returns the capabilities of the local node.

func (*Coordinator) GetFileEntry

func (c *Coordinator) GetFileEntry(path string) (*raft.FileEntry, bool)

GetFileEntry returns the manifest entry for a given relative path. Returns false if the file is not in the manifest (standalone mode or pre-cluster file).

func (*Coordinator) GetFileManifest

func (c *Coordinator) GetFileManifest() []*raft.FileEntry

GetFileManifest returns the current file manifest from the Raft FSM. Returns nil if Raft is not initialized.

func (*Coordinator) GetFileManifestByDatabase

func (c *Coordinator) GetFileManifestByDatabase(database string) []*raft.FileEntry

GetFileManifestByDatabase returns files for a specific database.

func (*Coordinator) GetHealthChecker

func (c *Coordinator) GetHealthChecker() *HealthChecker

GetHealthChecker returns the health checker.

func (*Coordinator) GetLocalNode

func (c *Coordinator) GetLocalNode() *Node

GetLocalNode returns the local node.

func (*Coordinator) GetRaftNode

func (c *Coordinator) GetRaftNode() *raft.Node

GetRaftNode returns the Raft node (may be nil if Raft is not configured).

func (*Coordinator) GetRegistry

func (c *Coordinator) GetRegistry() *Registry

GetRegistry returns the node registry.

func (*Coordinator) GetReplicationStats

func (c *Coordinator) GetReplicationStats() map[string]interface{}

GetReplicationStats returns replication statistics.

func (*Coordinator) GetRole

func (c *Coordinator) GetRole() NodeRole

GetRole returns the role of the local node.

func (*Coordinator) GetRouter

func (c *Coordinator) GetRouter() *Router

GetRouter returns the request router.

func (*Coordinator) IsActiveCompactor

func (c *Coordinator) IsActiveCompactor() bool

IsActiveCompactor returns true if this node currently holds the compactor lease.

func (*Coordinator) IsLeader

func (c *Coordinator) IsLeader() bool

IsLeader returns true if this node is the Raft leader. Always returns true if Raft is not configured (standalone mode).

func (*Coordinator) IsPrimaryWriter

func (c *Coordinator) IsPrimaryWriter() bool

IsPrimaryWriter implements api.RetentionCoordinator, api.DeleteCoordinator, api.CQCoordinator, and scheduler.WriterGate: reports whether this node is the primary writer and may execute writer-only mutations. When failover is disabled there is no promoted primary — any writer node is authoritative, so we fall back to a role check.

func (*Coordinator) IsRunning

func (c *Coordinator) IsRunning() bool

IsRunning returns true if the coordinator is running.

func (*Coordinator) LeaderAddr

func (c *Coordinator) LeaderAddr() string

LeaderAddr returns the address of the current Raft leader. Returns empty string if Raft is not configured.

func (*Coordinator) LocalNodeID

func (c *Coordinator) LocalNodeID() string

LocalNodeID returns the local cluster node ID. Phase 4 uses this via the CompactionBridge to set OriginNodeID on compacted-file Raft entries so Phase 2/3's multi-peer resolver routes replica pulls back to the compactor that produced the output.

func (*Coordinator) RegisterFileInManifest

func (c *Coordinator) RegisterFileInManifest(file raft.FileEntry) error

RegisterFileInManifest appends a file entry to the cluster-wide manifest via Raft. Returns nil if Raft is not initialized (standalone mode).

Phase 4: when the local node is not the Raft leader, the command is forwarded to the current leader over the peer protocol instead of being silently dropped. Prior to Phase 4 this method returned nil on non-leader, which was a latent data-loss bug — writers that were not the leader would silently lose all their file registrations and the manifest would diverge from storage.

On forwarding failure (no leader known, leader unreachable, or the leader rejects the apply), the error is returned and the caller can retry. The forwarding path is bounded by forwardApplyTimeout so a stuck leader doesn't block the writer flush hot path indefinitely.

func (*Coordinator) RemoveNodeViaRaft

func (c *Coordinator) RemoveNodeViaRaft(nodeID string) error

RemoveNodeViaRaft removes a node from the cluster via Raft consensus. It removes the node from both the Raft voting configuration and the cluster FSM state, then unregisters it from the local registry. Must be called on the leader.

func (*Coordinator) ReplicationCatchUpStatus

func (c *Coordinator) ReplicationCatchUpStatus() map[string]int64

ReplicationCatchUpStatus returns the puller's catch-up-specific stats as a JSON-serializable map, or nil when the puller is not running. Intended for /api/v1/cluster/status. The keys mirror what Phase 5 will hard-gate the query path on.

func (*Coordinator) ReplicationReady

func (c *Coordinator) ReplicationReady() bool

ReplicationReady reports whether the Phase 3 catch-up walker has finished its pass over the cluster manifest. Not currently consumed — Phase 5 will use this to hard-gate the query path during startup so readers can't return eventually-consistent results during catch-up. Lands now so the coordinator surface doesn't need another change in Phase 5.

func (*Coordinator) Role

func (c *Coordinator) Role() string

Role implements api.RetentionCoordinator: returns a human-readable role string for log messages.

func (*Coordinator) SetCompactorCallbacks

func (c *Coordinator) SetCompactorCallbacks(onBecome, onLose func())

SetCompactorCallbacks sets the callbacks for dynamic compaction activation. Called from main.go before Start() so the FSM callback has hooks to invoke.

func (*Coordinator) SetIngestBuffer

func (c *Coordinator) SetIngestBuffer(buffer *ingest.ArrowBuffer)

SetIngestBuffer sets the ArrowBuffer for reader nodes to apply replicated entries. This enables query freshness — readers can query unflushed writer data that arrives via WAL replication.

func (*Coordinator) SetStorageBackend

func (c *Coordinator) SetStorageBackend(backend storage.Backend)

SetStorageBackend sets the local storage backend reference. It is required for Enterprise peer replication Phase 2: the fetch handler reads local file bytes via this backend to stream them to pulling peers, and the puller writes received bytes into it. Must be called before Start when peer replication is enabled.

func (*Coordinator) SetWAL

func (c *Coordinator) SetWAL(walWriter *wal.Writer)

SetWAL sets the WAL writer reference for replication. This should be called after the WAL is created but before Start().

func (*Coordinator) Start

func (c *Coordinator) Start() error

Start starts the cluster coordinator.

func (*Coordinator) StartReplication

func (c *Coordinator) StartReplication() error

StartReplication starts WAL replication based on node role. - Writers start a Sender to stream entries to readers - Readers start a Receiver to receive entries from the writer This should be called after Start() and SetWAL().

func (*Coordinator) Status

func (c *Coordinator) Status() map[string]interface{}

Status returns the cluster status as a map for JSON serialization.

func (*Coordinator) Stop

func (c *Coordinator) Stop() error

func (*Coordinator) StopReplication

func (c *Coordinator) StopReplication()

StopReplication stops WAL replication.

func (*Coordinator) UpdateFileInManifest

func (c *Coordinator) UpdateFileInManifest(file raft.FileEntry) error

UpdateFileInManifest updates an existing file's metadata in the cluster manifest after a partial rewrite that changes size/checksum but keeps the same path. Same leader-forwarding semantics as RegisterFileInManifest.

func (*Coordinator) UpdateNodeStateViaRaft

func (c *Coordinator) UpdateNodeStateViaRaft(nodeID string, state NodeState) error

UpdateNodeStateViaRaft updates a node's state via Raft consensus. Must be called on the leader.

type CoordinatorConfig

type CoordinatorConfig struct {
	Config        *config.ClusterConfig
	LicenseClient *license.Client
	Version       string // Arc version
	APIAddress    string // HTTP API address for this node
	Logger        zerolog.Logger

	// Phase 4: when true, the embedded HealthChecker surfaces rate-limited
	// Warn logs when the cluster has zero or >1 nodes in RoleCompactor.
	// Main wires this to cfg.Cluster.Enabled && cfg.Cluster.ReplicationEnabled &&
	// cfg.Compaction.Enabled so the warning only fires in deployments where
	// a missing compactor is actually a problem.
	WarnIfNoCompactor bool
}

CoordinatorConfig holds configuration for the coordinator.

type CoordinatorFileRegistrar

type CoordinatorFileRegistrar struct {
	// contains filtered or unexported fields
}

CoordinatorFileRegistrar implements ingest.FileRegistrar by forwarding file registration events to the cluster coordinator's Raft manifest.

The registration is fully async: RegisterFile enqueues the entry and returns immediately. A background worker drains the queue and calls coordinator.RegisterFileInManifest. This ensures the flush hot path is never blocked on Raft consensus, network I/O, or checksum compute.

The SHA-256 checksum is computed by the flush path on the in-memory Parquet buffer before the backend write and passed to RegisterFile. Peers use it to verify files pulled during Phase 2 replication.

func NewCoordinatorFileRegistrar

func NewCoordinatorFileRegistrar(coord *Coordinator, logger zerolog.Logger) *CoordinatorFileRegistrar

NewCoordinatorFileRegistrar creates a new registrar backed by the coordinator. The coordinator must be started before the registrar is used.

func (*CoordinatorFileRegistrar) RegisterFile

func (r *CoordinatorFileRegistrar) RegisterFile(database, measurement, path string, partitionTime time.Time, sizeBytes int64, sha256 string)

RegisterFile implements ingest.FileRegistrar. Non-blocking: enqueues the registration and returns immediately. If the queue is full, the entry is dropped and a counter is incremented — peer replication will discover it on the next anti-entropy scan (Phase 3+).

sha256 is a hex-encoded SHA-256 of the Parquet file bytes. The caller (arrow_writer.go flush path) computes it on the in-memory buffer before the storage backend write, so it's effectively free.

func (*CoordinatorFileRegistrar) Start

func (r *CoordinatorFileRegistrar) Start(parentCtx context.Context)

Start launches the background worker that drains the registration queue.

func (*CoordinatorFileRegistrar) Stats

func (r *CoordinatorFileRegistrar) Stats() map[string]int64

Stats returns a snapshot of registrar metrics.

func (*CoordinatorFileRegistrar) Stop

func (r *CoordinatorFileRegistrar) Stop()

Stop signals the worker to exit, drains any pending entries, and waits for the worker to finish. Best-effort drain: entries still in-flight after the drain timeout are discarded (they can be recovered by anti-entropy in a future phase).

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker performs periodic health checks on cluster nodes. It monitors node heartbeats and updates node state based on check results.

func NewHealthChecker

func NewHealthChecker(cfg *HealthCheckerConfig) *HealthChecker

NewHealthChecker creates a new health checker.

func (*HealthChecker) CheckNow

func (h *HealthChecker) CheckNow(nodeID string) bool

CheckNow performs an immediate health check on a specific node. This is useful for on-demand checks outside the regular interval.

func (*HealthChecker) GetCheckInterval

func (h *HealthChecker) GetCheckInterval() time.Duration

GetCheckInterval returns the configured check interval.

func (*HealthChecker) GetUnhealthyThreshold

func (h *HealthChecker) GetUnhealthyThreshold() int

GetUnhealthyThreshold returns the configured unhealthy threshold.

func (*HealthChecker) IsRunning

func (h *HealthChecker) IsRunning() bool

IsRunning returns true if the health checker is running.

func (*HealthChecker) SetRaftFSM

func (h *HealthChecker) SetRaftFSM(fsm *raft.ClusterFSM)

checkCompactorElected enforces the Phase 4 "exactly one compactor" invariant via rate-limited Warn logs. Two failure modes:

  • Zero compactors: operator forgot to set ARC_CLUSTER_ROLE=compactor on any node. Compacted files will never be registered in the Raft manifest and the cluster will slowly accumulate small source files.
  • Multiple compactors: operator set RoleCompactor on more than one node, re-introducing the shared-storage duplicate-output bug that Phase 4 is designed to prevent.

Both cases are non-fatal — Arc keeps running, queries still work, but the cluster is in a degraded state that requires operator attention. A Warn log (not Error) is the right level because the operator chose the configuration; we're flagging it, not failing on it.

Design note on rate limiting: we always walk the registry (cheap — it's an in-memory map) and decide the message FIRST, then apply the rate-limit to the LOG EMISSION only. This way, the "correct config" branch unconditionally resets both timers — so when the cluster transitions broken → correct → broken, the second "broken" warning fires immediately instead of waiting out the full minute from the first one.

The two failure modes have SEPARATE timers so a cluster flapping between 0 and 2 compactors surfaces both warnings within the same minute. Sharing a single timer would let whichever warning lost the CAS race silence the other for the whole interval — an operator watching a log-tail could see only "no compactor" while "multiple compactors" was the more recent condition. SetRaftFSM wires the Raft FSM so checkCompactorElected can check the Phase 5 active compactor lease. Called from coordinator wiring.

func (*HealthChecker) Start

func (h *HealthChecker) Start()

Start starts the health checker background loop.

func (*HealthChecker) Status

func (h *HealthChecker) Status() map[string]interface{}

Status returns the health checker status for monitoring.

func (*HealthChecker) Stop

func (h *HealthChecker) Stop()

Stop stops the health checker.

type HealthCheckerConfig

type HealthCheckerConfig struct {
	Registry           *Registry
	CheckInterval      time.Duration // How often to check nodes (default: 5s)
	CheckTimeout       time.Duration // Timeout for each check (default: 3s)
	UnhealthyThreshold int           // Failed checks before marking unhealthy (default: 3)
	// Phase 4: when true, the health loop logs rate-limited Warn messages
	// if zero or >1 nodes have RoleCompactor. Set from main.go based on
	// cluster + replication + compaction config. Zero value (false) means
	// OSS / standalone and the check is skipped.
	WarnIfNoCompactor bool
	Logger            zerolog.Logger
}

HealthCheckerConfig holds configuration for the health checker.

type LoadBalanceStrategy

type LoadBalanceStrategy string

LoadBalanceStrategy defines how to select nodes for routing.

const (
	// LoadBalanceRoundRobin selects nodes in round-robin order.
	LoadBalanceRoundRobin LoadBalanceStrategy = "round_robin"

	// LoadBalanceLeastConnections selects the node with fewest active connections.
	LoadBalanceLeastConnections LoadBalanceStrategy = "least_connections"

	// LoadBalanceRandom selects a random healthy node.
	LoadBalanceRandom LoadBalanceStrategy = "random"
)

type Node

type Node struct {
	// Identity
	ID          string   `json:"id"`           // Unique node identifier
	Name        string   `json:"name"`         // Human-readable name
	Role        NodeRole `json:"role"`         // Node role (writer, reader, compactor, standalone)
	ClusterName string   `json:"cluster_name"` // Name of the cluster this node belongs to

	// Network
	Address    string `json:"address"`     // Coordinator address (host:port for inter-node communication)
	APIAddress string `json:"api_address"` // HTTP API address (host:port for client requests)

	// State
	State NodeState `json:"state"` // Current health state

	// Health tracking
	LastHeartbeat time.Time `json:"last_heartbeat"` // Time of last successful heartbeat
	LastHealthy   time.Time `json:"last_healthy"`   // Time node was last known healthy
	FailedChecks  int       `json:"failed_checks"`  // Consecutive failed health checks

	// Metadata
	Version   string    `json:"version"`    // Arc version running on this node
	StartedAt time.Time `json:"started_at"` // When the node process started
	JoinedAt  time.Time `json:"joined_at"`  // When the node joined the cluster

	// Writer state (primary/standby) - only relevant for writer nodes
	WriterSt WriterState `json:"writer_state,omitempty"`

	// Runtime stats (updated via heartbeat)
	Stats NodeStats `json:"stats"`
	// contains filtered or unexported fields
}

Node represents a node in the Arc cluster.

func NewNode

func NewNode(id, name string, role NodeRole, clusterName string) *Node

NewNode creates a new Node with the given parameters.

func (*Node) Clone

func (n *Node) Clone() *Node

Clone returns a deep copy of the node (without the mutex).

func (*Node) GetCapabilities

func (n *Node) GetCapabilities() RoleCapabilities

GetCapabilities returns the capabilities for this node's role.

func (*Node) GetFailedChecks

func (n *Node) GetFailedChecks() int

GetFailedChecks returns the number of consecutive failed health checks.

func (*Node) GetLastHeartbeat

func (n *Node) GetLastHeartbeat() time.Time

GetLastHeartbeat returns the time of the last successful heartbeat.

func (*Node) GetState

func (n *Node) GetState() NodeState

GetState returns the current node state.

func (*Node) GetStats

func (n *Node) GetStats() NodeStats

GetStats returns a copy of the node's current stats.

func (*Node) GetWriterState

func (n *Node) GetWriterState() WriterState

GetWriterState returns the writer state of this node.

func (*Node) IsAvailable

func (n *Node) IsAvailable() bool

IsAvailable returns true if the node can accept requests. A node is available if it's healthy or just joining (warming up).

func (*Node) IsHealthy

func (n *Node) IsHealthy() bool

IsHealthy returns true if the node is in a healthy state.

func (*Node) IsPrimaryWriter

func (n *Node) IsPrimaryWriter() bool

IsPrimaryWriter returns true if this node is the primary writer.

func (*Node) MarkJoined

func (n *Node) MarkJoined()

MarkJoined marks the node as having joined the cluster.

func (*Node) ProcessHealthCheckResult

func (n *Node) ProcessHealthCheckResult(healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition

ProcessHealthCheckResult atomically processes a health check result and returns any state transition that occurred. This prevents TOCTOU race conditions by performing the check and update under a single lock.

func (*Node) RecordFailedCheck

func (n *Node) RecordFailedCheck() int

RecordFailedCheck records a failed health check. Returns the new count of consecutive failed checks.

func (*Node) RecordHeartbeat

func (n *Node) RecordHeartbeat(stats NodeStats)

RecordHeartbeat records a successful heartbeat from the node.

func (*Node) SetAddresses

func (n *Node) SetAddresses(coordinatorAddr, apiAddr string)

SetAddresses sets the node's network addresses.

func (*Node) SetVersion

func (n *Node) SetVersion(version string)

SetVersion sets the node's Arc version.

func (*Node) SetWriterState

func (n *Node) SetWriterState(state WriterState)

SetWriterState sets the writer state of this node.

func (*Node) UpdateState

func (n *Node) UpdateState(state NodeState)

UpdateState updates the node state.

type NodeEventCallback

type NodeEventCallback func(*Node)

NodeEventCallback is called when node events occur.

type NodeRole

type NodeRole string

NodeRole represents the role of a node in the cluster. Each role has specific capabilities that determine what operations the node can perform.

const (
	// RoleStandalone is the default single-node deployment mode (OSS-compatible).
	// Can perform all operations: ingest, query, and compact.
	RoleStandalone NodeRole = "standalone"

	// RoleWriter handles ingestion, WAL, and flushes to shared storage.
	// Can also execute queries for monitoring and validation.
	RoleWriter NodeRole = "writer"

	// RoleReader is a query-only node that reads from shared storage.
	// Cannot ingest data or run compaction jobs.
	RoleReader NodeRole = "reader"

	// RoleCompactor runs background compaction and maintenance tasks.
	// Cannot ingest data or serve queries.
	RoleCompactor NodeRole = "compactor"
)

func AllRoles

func AllRoles() []NodeRole

AllRoles returns all valid node roles.

func ParseRole

func ParseRole(role string) NodeRole

ParseRole parses a string into a NodeRole. Returns RoleStandalone if the role is empty or invalid.

func (NodeRole) GetCapabilities

func (r NodeRole) GetCapabilities() RoleCapabilities

GetCapabilities returns the capabilities for this role.

func (NodeRole) IsValid

func (r NodeRole) IsValid() bool

IsValid returns true if the role is a recognized value.

func (NodeRole) String

func (r NodeRole) String() string

String returns the string representation of the role.

type NodeState

type NodeState string

NodeState represents the health state of a node in the cluster.

const (
	// StateUnknown is the initial state before health is determined.
	StateUnknown NodeState = "unknown"

	// StateHealthy indicates the node is operating normally.
	StateHealthy NodeState = "healthy"

	// StateUnhealthy indicates the node has failed health checks but may recover.
	StateUnhealthy NodeState = "unhealthy"

	// StateDead indicates the node has been unreachable for an extended period.
	StateDead NodeState = "dead"

	// StateJoining indicates the node is joining the cluster.
	StateJoining NodeState = "joining"

	// StateLeaving indicates the node is gracefully leaving the cluster.
	StateLeaving NodeState = "leaving"
)

func (NodeState) String

func (s NodeState) String() string

String returns the string representation of the state.

type NodeStats

type NodeStats struct {
	CPUUsage       float64 `json:"cpu_usage"`       // CPU usage percentage (0-100)
	MemoryUsage    float64 `json:"memory_usage"`    // Memory usage percentage (0-100)
	IngestRate     int64   `json:"ingest_rate"`     // Records ingested per second
	QueryRate      int64   `json:"query_rate"`      // Queries executed per second
	StorageUsed    int64   `json:"storage_used"`    // Bytes used in storage
	Connections    int     `json:"connections"`     // Active HTTP connections
	ActiveQueries  int     `json:"active_queries"`  // Currently running queries
	CompactionJobs int     `json:"compaction_jobs"` // Active compaction jobs (for compactor role)
}

NodeStats contains runtime statistics for a node. These are updated periodically via heartbeat.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages cluster node membership. It provides thread-safe access to the list of nodes in the cluster.

func NewRegistry

func NewRegistry(cfg *RegistryConfig) *Registry

NewRegistry creates a new node registry.

func (*Registry) Count

func (r *Registry) Count() int

Count returns the total number of registered nodes.

func (*Registry) CountByRole

func (r *Registry) CountByRole(role NodeRole) int

CountByRole returns the number of nodes with a specific role.

func (*Registry) CountHealthy

func (r *Registry) CountHealthy() int

CountHealthy returns the number of healthy nodes.

func (*Registry) Get

func (r *Registry) Get(nodeID string) (*Node, bool)

Get returns a clone of a node by ID. Returns a cloned node to prevent race conditions from concurrent modifications.

func (*Registry) GetAll

func (r *Registry) GetAll() []*Node

GetAll returns cloned copies of all registered nodes.

func (*Registry) GetAvailable

func (r *Registry) GetAvailable() []*Node

GetAvailable returns cloned copies of all available nodes (healthy or joining).

func (*Registry) GetByRole

func (r *Registry) GetByRole(role NodeRole) []*Node

GetByRole returns cloned nodes with a specific role.

func (*Registry) GetByState

func (r *Registry) GetByState(state NodeState) []*Node

GetByState returns cloned nodes with a specific state.

func (*Registry) GetCompactors

func (r *Registry) GetCompactors() []*Node

GetCompactors returns cloned copies of all healthy compactor nodes.

func (*Registry) GetHealthy

func (r *Registry) GetHealthy() []*Node

GetHealthy returns cloned copies of all healthy nodes.

func (*Registry) GetLastHeartbeat

func (r *Registry) GetLastHeartbeat(nodeID string) time.Time

GetLastHeartbeat returns the LastHeartbeat timestamp of the real node in the registry. Returns zero time if the node doesn't exist.

func (*Registry) GetPrimaryWriter

func (r *Registry) GetPrimaryWriter() *Node

GetPrimaryWriter returns the primary writer node, or nil if none exists.

func (*Registry) GetReaders

func (r *Registry) GetReaders() []*Node

GetReaders returns cloned copies of all healthy reader nodes.

func (*Registry) GetStandbyWriters

func (r *Registry) GetStandbyWriters() []*Node

GetStandbyWriters returns healthy standby writer nodes.

func (*Registry) GetWriters

func (r *Registry) GetWriters() []*Node

GetWriters returns cloned copies of all healthy writer nodes.

func (*Registry) Local

func (r *Registry) Local() *Node

Local returns the local node.

func (*Registry) NotifyHealthy

func (r *Registry) NotifyHealthy(node *Node)

NotifyHealthy notifies that a node became healthy.

func (*Registry) NotifyUnhealthy

func (r *Registry) NotifyUnhealthy(node *Node)

NotifyUnhealthy notifies that a node became unhealthy.

func (*Registry) ProcessHealthCheck

func (r *Registry) ProcessHealthCheck(nodeID string, healthy bool, unhealthyThreshold, deadThreshold int) *StateTransition

ProcessHealthCheck runs a health check result against the real node in the registry (not a clone). Returns the state transition if any occurred.

func (*Registry) RecordHeartbeat

func (r *Registry) RecordHeartbeat(nodeID string, stats NodeStats) bool

RecordHeartbeat updates the LastHeartbeat on the real node in the registry (not a clone). This is called from handleHeartbeat when a peer sends us a heartbeat message. Node.RecordHeartbeat is thread-safe via the node's internal mutex.

func (*Registry) Register

func (r *Registry) Register(node *Node) error

Register adds or updates a node in the registry. Returns ErrTooManyNodes if the registry has reached its maximum capacity.

func (*Registry) SetCallbacks

func (r *Registry) SetCallbacks(
	onJoined NodeEventCallback,
	onLeft NodeEventCallback,
	onHealthy NodeEventCallback,
	onUnhealthy NodeEventCallback,
)

SetCallbacks sets event callbacks for node lifecycle events. Callbacks are invoked asynchronously to prevent blocking the registry.

func (*Registry) Summary

func (r *Registry) Summary() map[string]int

Summary returns a summary of the registry state.

func (*Registry) Unregister

func (r *Registry) Unregister(nodeID string)

Unregister removes a node from the registry.

func (*Registry) UpdateNodeState

func (r *Registry) UpdateNodeState(nodeID string, state NodeState)

UpdateNodeState updates the state of the real node in the registry. Called from handleHeartbeat to propagate the peer's self-reported state.

type RegistryConfig

type RegistryConfig struct {
	LocalNode *Node
	MaxNodes  int
	Logger    zerolog.Logger
}

RegistryConfig holds configuration for the node registry.

type RoleCapabilities

type RoleCapabilities struct {
	CanIngest     bool // Accept write requests (LineProtocol, MessagePack)
	CanQuery      bool // Execute SQL queries
	CanCompact    bool // Run compaction jobs
	CanCoordinate bool // Participate in cluster coordination (leader election)
}

RoleCapabilities defines what operations each role can perform.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router routes requests to appropriate nodes in the cluster. The coordinator node uses this to forward writes to writers and queries to readers.

func NewRouter

func NewRouter(cfg *RouterConfig) *Router

NewRouter creates a new request router.

func (*Router) CanRouteLocally

func (r *Router) CanRouteLocally(isWrite bool) bool

CanRouteLocally returns true if the local node can handle the given request type.

func (*Router) GetActiveConnections

func (r *Router) GetActiveConnections(nodeID string) int64

GetActiveConnections returns the number of active connections to a node.

func (*Router) RouteQuery

func (r *Router) RouteQuery(ctx context.Context, req *http.Request) (*http.Response, error)

RouteQuery routes a query request to an appropriate reader node. Returns ErrLocalNodeCanHandle if this node can process the query directly.

func (*Router) RouteWrite

func (r *Router) RouteWrite(ctx context.Context, req *http.Request) (*http.Response, error)

RouteWrite routes a write request to an appropriate writer node. Returns ErrLocalNodeCanHandle if this node can process the write directly.

func (*Router) Stats

func (r *Router) Stats() map[string]interface{}

Stats returns routing statistics.

type RouterConfig

type RouterConfig struct {
	// Timeout for forwarded requests
	Timeout time.Duration

	// Number of retries for failed forwards
	Retries int

	// Load balancing strategy
	Strategy LoadBalanceStrategy

	// Registry for looking up nodes
	Registry *Registry

	// LocalNode is this node's identity
	LocalNode *Node

	// Logger for routing events
	Logger zerolog.Logger
}

RouterConfig holds configuration for the request router.

type StateTransition

type StateTransition struct {
	OldState     NodeState
	NewState     NodeState
	FailedChecks int
}

StateTransition represents a node state change.

type WriterFailoverConfig

type WriterFailoverConfig struct {
	// Registry provides access to cluster node state
	Registry *Registry

	// RaftNode for applying promote/demote commands via consensus
	RaftNode *arcRaft.Node

	// HealthCheckInterval is how often to check writer health
	HealthCheckInterval time.Duration

	// FailoverTimeout is the timeout for the entire failover operation
	FailoverTimeout time.Duration

	// CooldownPeriod prevents repeated failovers within this window
	CooldownPeriod time.Duration

	// UnhealthyThreshold is consecutive unhealthy checks before triggering failover
	UnhealthyThreshold int

	// Logger for failover events
	Logger zerolog.Logger
}

WriterFailoverConfig holds configuration for the writer failover manager.

type WriterFailoverManager

type WriterFailoverManager struct {
	// contains filtered or unexported fields
}

WriterFailoverManager monitors writer health and promotes standby writers when the primary fails. Only the Raft leader runs active health checks.

func NewWriterFailoverManager

func NewWriterFailoverManager(cfg *WriterFailoverConfig) *WriterFailoverManager

NewWriterFailoverManager creates a new writer failover manager.

func (*WriterFailoverManager) HandleWriterUnhealthy

func (m *WriterFailoverManager) HandleWriterUnhealthy(node *Node)

HandleWriterUnhealthy is called when a writer node becomes unhealthy. This is invoked by the registry's onNodeUnhealthy callback.

func (*WriterFailoverManager) SetCallbacks

func (m *WriterFailoverManager) SetCallbacks(
	onStart func(oldPrimaryID, newPrimaryID string),
	onComplete func(newPrimaryID string, success bool),
)

SetCallbacks sets callbacks for failover events.

func (*WriterFailoverManager) Start

Start begins the writer failover manager.

func (*WriterFailoverManager) Stats

func (m *WriterFailoverManager) Stats() map[string]interface{}

Stats returns failover manager statistics.

func (*WriterFailoverManager) Stop

func (m *WriterFailoverManager) Stop() error

Stop gracefully shuts down the writer failover manager.

func (*WriterFailoverManager) TriggerManualFailover

func (m *WriterFailoverManager) TriggerManualFailover() error

TriggerManualFailover triggers a manual failover from the current primary.

type WriterState

type WriterState string

WriterState represents whether a writer node is primary or standby.

const (
	// WriterStatePrimary indicates this writer is the active primary accepting writes.
	WriterStatePrimary WriterState = "primary"

	// WriterStateStandby indicates this writer is a hot standby ready for promotion.
	WriterStateStandby WriterState = "standby"

	// WriterStateNone indicates the node is not a writer or has no assigned writer state.
	WriterStateNone WriterState = ""
)

Directories

Path Synopsis
Package filereplication implements peer-to-peer Parquet file replication for Arc Enterprise clusters without shared storage.
Package filereplication implements peer-to-peer Parquet file replication for Arc Enterprise clusters without shared storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL