filereplication

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Overview

Package filereplication implements peer-to-peer Parquet file replication for Arc Enterprise clusters without shared storage. It is the byte-level counterpart to Phase 1's cluster-wide file manifest: when a new file is announced on the Raft log, the puller downloads the bytes from the origin peer over the coordinator TCP protocol, verifies the SHA-256, and writes the file to the local storage backend.

The puller is gated by the Enterprise license (FeatureClustering) and wired by the coordinator when cluster.replication_enabled is true.

Index

Constants

This section is empty.

Variables

View Source
var ErrBadOffset = errors.New("filereplication: bad byte offset")

ErrBadOffset is returned by Fetcher implementations when the server rejects the requested byte offset (negative, >= file size, or backend doesn't support seeks). The puller should delete any partial file and retry from zero — not fall through to another peer, since the file exists there and the offset is simply invalid or stale.

View Source
var ErrChecksumMismatch = errors.New("filereplication: checksum mismatch")

ErrChecksumMismatch is returned by Fetcher implementations when the bytes pulled from a peer don't match the expected SHA-256 from the manifest. The puller tracks this as a distinct metric and deletes the partial local file before retrying. Unlike ErrFileNotOnPeer, this error does NOT trigger the multi-peer fallback — a corrupt body is a data integrity signal.

View Source
var ErrFileNotOnPeer = errors.New("filereplication: file not on peer")

ErrFileNotOnPeer is returned by Fetcher implementations when a peer explicitly reports that it does not hold the requested file (via the ack header Code field, or via a known error string from a Phase 2 peer). The puller treats this as a fallback trigger: the next candidate in the resolver's list is tried before the attempt is considered failed. This is essential for Phase 3 catch-up after a Kubernetes pod rotation where the original writer is gone but other peers still hold the file.

Functions

This section is empty.

Types

type Config

type Config struct {
	// SelfNodeID is the ID of the local node. Files whose OriginNodeID matches
	// are skipped (the origin already has the bytes).
	SelfNodeID string

	// Backend is the local storage backend. The puller calls Exists to skip
	// already-local files and WriteReader to stream pulled bytes onto disk.
	Backend storage.Backend

	// Fetcher is the network client that actually downloads file bytes from
	// a peer. Injected so tests can use a fake.
	Fetcher Fetcher

	// PeerResolver looks up the coordinator address for a node ID.
	PeerResolver PeerResolver

	// Workers is the number of concurrent pull goroutines. Default: 4.
	Workers int

	// QueueSize is the buffered channel capacity. Enqueues past this limit
	// are dropped and counted. Default: 1024.
	QueueSize int

	// RetryMaxAttempts is the number of immediate retry attempts for a single
	// pull failure before the entry is given up on. Further recovery happens
	// via a later FSM callback or the Phase 3 catch-up scanner. Default: 3.
	RetryMaxAttempts int

	// RetryInitialBackoff is the first retry delay. Doubles on each attempt.
	// Default: 500ms.
	RetryInitialBackoff time.Duration

	// FetchTimeout bounds a single Fetcher.Fetch call. Default: 60s.
	FetchTimeout time.Duration

	// CatchUpQueueHighWater is the queue-depth fraction above which the
	// Phase 3 catch-up walker pauses enqueueing. Keeps the walker from
	// racing ahead of workers and causing drop storms on large manifests.
	// Default: 0.8 (sleep when > 80% full).
	CatchUpQueueHighWater float64

	// Logger receives structured log output.
	Logger zerolog.Logger
}

Config bundles the puller's dependencies and tunables.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults. Callers typically override only the dependencies (Backend, Fetcher, PeerResolver, SelfNodeID, Logger).

type FetchClient

type FetchClient struct {
	// SelfNodeID is our node ID, embedded in each request's HMAC payload so
	// the origin peer can identify the caller.
	SelfNodeID string
	// ClusterName is our cluster name — included in the HMAC payload to
	// prevent cross-cluster replay.
	ClusterName string
	// SharedSecret is the cluster-wide HMAC key. Required — callers must
	// refuse to construct a FetchClient without it.
	SharedSecret string
	// TLSConfig wraps the outbound dial in TLS when non-nil. Reuses the
	// cluster TLS config (PR #382) so inter-node traffic is end-to-end
	// encrypted under the cluster PKI, independent of the public API cert.
	TLSConfig *tls.Config
	// DialTimeout is the maximum time to establish the TCP (+TLS) connection.
	// Default 10s if zero.
	DialTimeout time.Duration
	// ResponseHeaderTimeout bounds how long we wait for the MsgFetchFileAck
	// header after sending the request. Default 15s if zero. The body stream
	// itself is bounded by the ctx passed to Fetch.
	ResponseHeaderTimeout time.Duration
}

FetchClient is the concrete Fetcher that the puller uses to download file bytes from a peer over the coordinator TCP protocol. It dials a fresh connection per fetch (no pooling in Phase 2), authenticates with HMAC headers, reads the MsgFetchFileAck header, and streams the raw body into the destination writer while computing SHA-256 for verification.

func NewFetchClient

func NewFetchClient(c FetchClient) (*FetchClient, error)

NewFetchClient validates the required fields and returns a ready-to-use client. Peer replication requires the shared secret, so a missing secret is a hard error rather than a fallback to unauthenticated operation.

func (*FetchClient) Fetch

func (f *FetchClient) Fetch(ctx context.Context, peerAddr string, entry *raft.FileEntry, dst io.Writer, byteOffset int64, prefixHasher hash.Hash) (int64, error)

Fetch dials the peer, sends a MsgFetchFile request, reads the ack header, streams body bytes into dst (while computing SHA-256), and verifies that the computed hash matches the manifest SHA-256 from entry.SHA256.

byteOffset is the byte position to resume from (0 = full fetch). When byteOffset > 0, prefixHasher must be a hash.Hash pre-fed with bytes [0, byteOffset) from the partial local file. Fetch streams bytes [byteOffset, entry.SizeBytes) through the same hasher and verifies the final digest against entry.SHA256. When byteOffset == 0, prefixHasher must be nil; Fetch creates a fresh sha256 hasher internally.

Returns the number of tail bytes written to dst and any error. On a checksum mismatch the error wraps ErrChecksumMismatch. On a bad-offset rejection from the peer the error wraps ErrBadOffset.

The connection is always closed before returning.

type Fetcher

type Fetcher interface {
	// Fetch downloads the file (or a tail of it) identified by entry from the
	// given peer address and writes body bytes into dst.
	//
	// byteOffset is the byte position to resume from (0 = full fetch). When
	// byteOffset > 0, prefixHasher must be a sha256.Hash pre-fed with bytes
	// [0, byteOffset) from the partial local file. Fetch streams bytes
	// [byteOffset, entry.SizeBytes) through the same hasher and verifies the
	// final hash against entry.SHA256. When byteOffset == 0, prefixHasher must
	// be nil; Fetch creates a fresh hasher internally.
	//
	// Returns (bytesWritten, error). bytesWritten counts only the tail bytes
	// received in this call (not the prefix already on disk).
	Fetch(ctx context.Context, peerAddr string, entry *raft.FileEntry, dst io.Writer, byteOffset int64, prefixHasher hash.Hash) (int64, error)
}

Fetcher is the contract the puller uses to download a single file from a peer. It's an interface rather than a concrete type so the puller can be unit-tested with a fake that returns deterministic bytes/errors without opening real TCP connections.

type PeerResolver

type PeerResolver interface {
	ResolvePeers(originNodeID, path string) []string
}

PeerResolver returns an ordered list of peer coordinator addresses that can serve a given file. The puller tries each address in order until one responds with the file bytes. The first address is typically the origin node (if still healthy) followed by any other healthy peers — that way catch-up after a Kubernetes pod rotation still works when the original writer is gone.

The resolver takes (originNodeID, path) rather than the full FileEntry so the interface stays decoupled from the raft package — any future implementation that wants richer routing (health-aware, latency-aware, shard-aware) only needs these two fields to make its decision.

The puller looks up addresses fresh on every attempt (no caching) so topology changes are picked up automatically. An empty slice means "no known peers" and is treated as a transient failure the puller can retry.

func NewRegistryResolver

func NewRegistryResolver(fn func(originNodeID, path string) []string) PeerResolver

NewRegistryResolver adapts a function that maps (originNodeID, path) to an ordered list of candidate peer addresses into a PeerResolver. Typical usage: return the origin address first (if still healthy) followed by any other healthy peers — that way Phase 3 catch-up still works after a Kubernetes pod rotation when the original writer is gone.

type Puller

type Puller struct {
	// contains filtered or unexported fields
}

Puller is the background worker pool that drains FSM file-registration callbacks and pulls missing files from their origin peers.

func New

func New(cfg Config) (*Puller, error)

New constructs a Puller. Does not start background workers — call Start.

func (*Puller) CatchUpCompleted

func (p *Puller) CatchUpCompleted() bool

CatchUpCompleted reports whether the startup catch-up walker has finished. Returns true once RunCatchUp has completed its pass over the manifest (not once all queued pulls have drained — that's a separate signal). Phase 5 will use this to hard-gate the query path during startup.

func (*Puller) Enqueue

func (p *Puller) Enqueue(entry *raft.FileEntry)

Enqueue submits a file entry for pulling. Non-blocking: if the queue is full, the entry is dropped and totalDropped is incremented. If origin is self, the file already exists locally, or the same path is already enqueued / in-flight (via the inflight set), the entry is counted as a skip and never reaches a worker.

Enqueue is safe to call from the Raft FSM apply callback (which must return quickly): all checks here are O(1) and no I/O happens inline. It is also safe to call from the Phase 3 catch-up walker concurrently with reactive callbacks — the inflight set dedups cross-path races.

func (*Puller) RunCatchUp

func (p *Puller) RunCatchUp(ctx context.Context, entries []*raft.FileEntry)

RunCatchUp walks a snapshot of the cluster file manifest and enqueues every entry the local node should hold but doesn't. It is the Phase 3 mechanism that brings a node with a stale or empty local backend back into sync with the manifest: on startup (and only on startup — periodic reconciliation is not in scope for Phase 3), the coordinator hands us the output of fsm.GetAllFiles() and we feed each entry through Enqueue so the regular worker pool pulls the missing bytes from a peer.

RunCatchUp does NOT itself talk to peers or verify files — it relies on Enqueue's existing origin-is-self check, the inflight dedup set (so reactive FSM callbacks can race without double-pulling), and the workers' backend.Exists pre-check. All the actual fetch work flows through the same code path that Phase 2 reactive pulls use, which means the same retry/backoff/checksum/metrics apply automatically.

To avoid a thundering-herd drop storm on large manifests, the feeder sleeps briefly whenever the queue is above CatchUpQueueHighWater (default 80%). This gives the workers time to drain and keeps reactive drops rare. The walker is NOT a hard gate on queries — the release notes document the eventual-consistency window between startup and full drain.

RunCatchUp is safe to call at most once per puller lifecycle: the catchupStartedAt atomic is CAS-guarded so a second call short-circuits. The method returns when all entries have been processed (enqueued or skipped) or ctx is cancelled; it does not wait for the actual pulls to complete.

func (*Puller) Start

func (p *Puller) Start(parentCtx context.Context)

Start launches the worker pool. Safe to call multiple times — subsequent calls are no-ops.

func (*Puller) Stats

func (p *Puller) Stats() map[string]int64

Stats returns a point-in-time snapshot of the puller's metrics, including Phase 3 catch-up counters.

func (*Puller) Stop

func (p *Puller) Stop()

Stop signals all workers to exit and waits for them to finish. In-flight pulls are cancelled via the shared context. Pending queue entries are dropped (Phase 3 catch-up or a later FSM callback will re-discover them).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL