Documentation
¶
Overview ¶
Package network archive layer.
Archive writes quasar-finalised WAL frames to object storage (S3 or GCS) as per-shard segment files. Each segment is a length-prefixed list of PQ-signed frames that can be replayed for point-in-time recovery.
Segment format: magic "LBN2" (authenticated — body+crc+pubkey signed by Ed25519). Version 1 ("LBN1") existed only during dev and is rejected unconditionally by readers to prevent downgrade attacks. Future format changes MUST bump the magic to "LBN3"/... and version-detect on read; never overload an existing magic — forwards compatibility only.
This file defines the Archive interface and the URL-based backend dispatcher. The consensus-side wiring (witness validator, frame feed) is owned by the core network package; archive is a dumb consumer of already-finalised data.
Package network replicates Base SQLite commits across peers via the luxfi/consensus Quasar engine. One shard = one Quasar engine = one DAG.
See docs/NETWORK.md for the full design. Contract:
- Enabled() — FromEnv toggles off when BASE_NETWORK is unset.
- Start/Stop — lifecycle for all shard engines and transport.
- InstallWALHook — per-connection commit hook; captures frames, submits to the shard engine.
- WriterFor — which pod owns shardID (consistent hash).
- MembersFor — replica set for shardID.
Index ¶
Constants ¶
const ( DefaultSegmentTargetBytes = 8 * 1024 * 1024 DefaultFlushInterval = 10 * time.Second DefaultRetryDeadline = 5 * time.Minute DefaultBacklogMaxBytes = 64 * 1024 * 1024 DefaultBacklogMaxSegments = 100_000 )
Defaults applied when a field is zero. R6: BacklogMax* cap per-shard in-memory backlog so a hostile / broken storage backend cannot OOM the process. 64 MiB / 100 000 segments is "enough headroom for a real S3 outage" but "small enough that even 32 shards on a 2 GiB pod can't OOM". Operators override via `BASE_SHARD_BACKLOG_MAX` and `BASE_SHARD_BACKLOG_SEGMENTS`.
Variables ¶
var ErrSegmentCorrupt = errors.New("segment: corrupt")
ErrSegmentCorrupt is returned when a segment fails magic, CRC, or signature validation on read.
var ErrSegmentUnsigned = errors.New("segment: no verifier configured")
ErrSegmentUnsigned is returned when a reader has no verifier (nil) or a writer has no signer (nil). Fail-closed — never decode/encode an unauthenticated segment.
var ErrTLSUnpinnedPeer = errors.New("transport: peer SAN not in BASE_PEERS allowlist")
ErrTLSUnpinnedPeer is returned by the verifier when a peer's leaf cert chains to the CA but does not present a SAN listed in AllowedSANs. This is the R5 defence — a compromised neighbor pod with a valid but unexpected identity cannot impersonate the real peers.
Functions ¶
func SetWALSource ¶
func SetWALSource(n Network, src walSource)
SetWALSource installs a capture function used by all future InstallWALHook calls. When unset, frames carry an empty payload (useful for exercising the consensus plane in isolation, e.g. integration tests).
Types ¶
type ApplyFunc ¶
ApplyFunc is called once per finalised frame per shard. Errors are counted on the metrics and do not retry — Quasar has already finalised the frame, so the caller owns recovery.
type Archive ¶
type Archive interface {
Append(ctx context.Context, shardID string, seq uint64, frame []byte) error
Range(ctx context.Context, shardID string, fromSeq, toSeq uint64) (iter.Seq2[Frame, error], error)
Close() error
}
Archive is the write+replay interface against object storage.
Append is called per finalised frame by the witness validator loop. It MUST buffer and return quickly; flushing to storage happens asynchronously. A slow backend must back-pressure via metrics, not by blocking Append indefinitely.
Range yields frames in seq order across segments overlapping [fromSeq, toSeq]. Iteration stops at the first error yielded.
func NewArchive ¶
func NewArchive(ctx context.Context, cfg ArchiveConfig, svc string, m *ArchiveMetrics) (Archive, error)
NewArchive dispatches on the URL scheme. Currently supported: s3:// → MinIO-protocol S3 (hanzoai/s3 self-hosted or AWS). gs:// → Google Cloud Storage. off → nil Archive, disabled (call sites treat as no-op). Anything else is a config error.
type ArchiveConfig ¶
type ArchiveConfig struct {
// URL is the backend destination: s3://bucket/prefix or
// gs://bucket/prefix. Prefix is optional; if absent, objects land
// at the bucket root.
URL string
// SegmentTargetBytes is the size at which an in-memory segment
// flushes. Defaults to 8 MiB.
SegmentTargetBytes int
// FlushInterval is the max time a partial segment may linger in
// memory before being flushed. Defaults to 10 s.
FlushInterval time.Duration
// RetryDeadline bounds transient retry attempts for a single
// segment flush. Defaults to 5 minutes. After it elapses, the
// segment is retained for the next flush cycle (never dropped).
RetryDeadline time.Duration
// SigningKey is the Ed25519 private key used to sign every segment
// we write. Production callers MUST inject a KMS-held key; when
// unset (tests, dev), the archive writer auto-generates a transient
// key — fine in-process, useless across restarts.
SigningKey ed25519.PrivateKey
// TrustedSegmentKeys are Ed25519 public keys whose segments the
// reader accepts. The local SigningKey's public half is always
// trusted; additional keys cover rotation (PITR against archives
// written under a now-retired key).
TrustedSegmentKeys []ed25519.PublicKey
// BacklogMaxBytes caps the per-shard in-memory backlog of encoded
// segments waiting to upload. Once exceeded, the oldest segments
// are dropped (counted on IncDrops). Zero = unbounded (dev only);
// production callers MUST set this to a sane ceiling.
BacklogMaxBytes int
// BacklogMaxSegments caps the per-shard backlog by segment count.
// Either limit firing triggers the drop path. Zero = unbounded.
BacklogMaxSegments int
}
ArchiveConfig controls segment size + flush cadence. Zero values fall back to defaults (8 MiB, 10 s).
type ArchiveMetrics ¶
type ArchiveMetrics struct {
// SetLagBytes reports the total size of buffered, not-yet-flushed
// frames across all shards. Surfaced as base_archive_lag_bytes.
SetLagBytes func(bytes int64)
// IncFailures increments a failures counter whenever a segment
// exhausts its retry deadline. The segment is re-buffered unless
// the backlog cap is exceeded.
IncFailures func()
// IncDrops increments a drop counter whenever the backlog cap
// sheds the oldest segment. Liveness-over-durability — operators
// should page on this.
IncDrops func()
}
ArchiveMetrics is a callback-based view onto the core Metrics collectors declared in metrics.go. Archive code stays decoupled from the Prometheus registry by going through function hooks — callers bind SetLagBytes to Metrics.ArchiveLagBytes.Set and IncFailures to a counter of their choice. A nil ArchiveMetrics is valid; both hooks become no-ops.
func BindArchiveMetrics ¶
func BindArchiveMetrics(m *Metrics) *ArchiveMetrics
BindArchiveMetrics builds an ArchiveMetrics that writes through to the core Metrics collectors. Callers pass the returned value into NewArchive. Nil-safe: a nil Metrics yields a nil-out ArchiveMetrics.
type Config ¶
type Config struct {
// Enabled is true iff BASE_NETWORK=quasar.
Enabled bool
// ShardKey names the identity carried by requests (JWT claim or header).
// Required when Enabled.
ShardKey string
// Replication is the number of members holding each shard. 1 = standalone
// DAG (durability via archive), 2 = pair, 3+ = Byzantine-safe quorum.
Replication int
// Peers is the static seed list. In k8s these are pod-ordinal DNS names
// emitted by the operator; in compose they are static service names.
// host:port form, p2p port not HTTP port.
Peers []string
// NodeID is the local member identity. Defaults to $HOSTNAME; overridable
// via BASE_NODE_ID for tests and compose.
NodeID string
// Role is "validator" (default) or "archive". Archive nodes don't vote;
// they subscribe to finalized frames and append to cold storage.
Role NodeRole
// Archive is the cold-storage URL or "off". s3://bucket/prefix,
// gs://bucket/prefix, file://path (dev), off.
Archive string
// ListenHTTP is the Base HTTP listen address. Used only for the
// /-/base/members endpoint by the Gateway; main HTTP comes from core.
ListenHTTP string
// ListenP2P is the Quasar peer-to-peer port.
ListenP2P string
// TLS is the mTLS config for the Quasar p2p transport (R5). When
// unset the transport runs without authentication — only acceptable
// in single-node dev and tests. Production callers inject certs via
// the KMS plugin (base/plugins/kms) or the operator-emitted dev CA.
// See transport_tls.go for the pinning rules.
TLS *TLSConfig
}
Config is the parsed env surface for a Base network member. All fields are immutable after construction; reparsing requires a restart (standard for pod-lifetime config).
func ConfigFromEnv ¶
ConfigFromEnv reads BASE_NETWORK, BASE_SHARD_KEY, BASE_REPLICATION, BASE_PEERS, BASE_NODE_ROLE, BASE_ARCHIVE, BASE_LISTEN_HTTP, BASE_LISTEN_P2P, and BASE_SHARD_BACKLOG_MAX / BASE_SHARD_BACKLOG_SEGMENTS (R6 per-shard backlog caps — the archive config is built separately from these by base/core's startup path). Standalone defaults are safe: no error, Enabled==false.
type Envelope ¶
Envelope is what the transport ships between nodes: a shardID header + the serialisable frame. Signing and transport security live at the transport layer; this struct is the minimal contract.
type Frame ¶
type Frame struct {
ShardID string
Seq uint64
PrevSeq uint64
Timestamp int64
Salt [16]byte
Payload []byte
Cksm [32]byte
Bytes []byte
}
Frame is the unit of replication. It is the serialised form of one committed SQLite transaction for a shard: opaque payload bytes (the WAL segment delta), a monotonic sequence, a random salt, and a checksum.
Frames are content-addressed by (salt, cksm) so that:
- duplicate submissions coalesce in the Quasar DAG,
- the apply callback is trivially idempotent on pod restart,
- any tamper on the wire is caught before apply.
Bytes is the serialised on-wire form (encode output). It is populated lazily by decodeFrame and by the archive layer so that frame replay keeps the original PQ signature intact; local producers use encode() directly.
type HookRegisterer ¶
type HookRegisterer interface {
RegisterCommitHook(func() int32)
}
HookRegisterer is the narrow surface we need from a SQLite driver connection: the method modernc.org/sqlite exposes on its *conn. Accepting it through a tiny interface keeps this package decoupled from the concrete driver type and test-friendly.
type Metrics ¶
type Metrics struct {
Shards prometheus.Counter
ActiveShards prometheus.Gauge
WALLagBytes prometheus.Gauge
HotShards prometheus.Gauge
LeaseContentions prometheus.Counter
ArchiveLagBytes prometheus.Gauge
// ArchiveFailures counts segment flushes that exhausted their
// retry deadline. Incremented once per failure regardless of
// re-buffer outcome.
ArchiveFailures prometheus.Counter
// ArchiveDrops counts segments shed by the per-shard backlog cap
// (R6). Under a healthy backend this stays at zero; non-zero is a
// pageable condition — we're losing archive frames to keep the pod
// alive.
ArchiveDrops prometheus.Counter
FramesSubmitted prometheus.Counter
FramesIngested prometheus.Counter
FramesFinalized prometheus.Counter
FramesDuplicate prometheus.Counter
FramesInvalid prometheus.Counter
// FramesRejectedShardMismatch counts frames dropped because the
// Envelope's routed shard disagreed with the Frame's internal
// shardID (R1 probe). Under healthy operation this stays at zero.
FramesRejectedShardMismatch prometheus.Counter
// FramesRejectedSeqGap counts frames whose height != prevHeight+1
// (R2). The apply path never bumps localSeq from these.
FramesRejectedSeqGap prometheus.Counter
ApplyErrors prometheus.Counter
WALHookErrors prometheus.Counter
WALBytes prometheus.Counter
}
Metrics owns every collector the network package exposes. Callers wire it into their own registry; we never touch prometheus.DefaultRegisterer.
The metric families correspond directly to the design doc:
base_shards counter shards assigned to this member base_active_shards gauge shards with a live Quasar engine base_wal_lag_bytes gauge bytes pending archive flush (set via Append) base_hot_shards gauge shards above activity threshold (set by scaler) base_lease_contentions counter times a writer lost a lease race base_archive_lag_bytes gauge mirror of wal_lag for Prom naming parity
Plus per-path counters used by the internal wiring:
base_network_frames_submitted_total base_network_frames_ingested_total base_network_frames_finalized_total base_network_frames_duplicate_total base_network_frames_invalid_total base_network_apply_errors_total base_network_wal_hook_errors_total base_network_wal_bytes_total
func NewMetrics ¶
func NewMetrics() *Metrics
NewMetrics constructs the collectors. Register them on a caller-owned registry with Register(). Calling NewMetrics twice produces two independent sets — useful for test isolation.
func (*Metrics) Register ¶
func (m *Metrics) Register(r prometheus.Registerer) error
Register adds every collector in m to r. Duplicate registration returns the first error; callers that need per-metric control can register one field at a time.
type Network ¶
type Network interface {
Enabled() bool
Start(ctx context.Context) error
Stop(ctx context.Context) error
// InstallWALHook wires the commit hook on a raw SQLite driver connection
// for the given shard. The hook captures the committed frame, signs it,
// and submits it to the shard's Quasar engine. Apply-on-finalize is
// idempotent by (salt, cksm).
InstallWALHook(rawConn any, shardID string) error
// WriterFor returns the current writer endpoint for shardID and whether
// it is the local pod. Gateway uses this for request routing.
WriterFor(shardID string) (endpoint string, local bool)
// MembersFor returns all replica endpoints for shardID.
MembersFor(shardID string) []string
// Metrics exposes the Prom collectors. Callers register via their own
// registry; we never touch the global default.
Metrics() *Metrics
}
Network is the public API Base uses. Standalone mode returns a no-op implementation from FromEnv; all methods are safe to call.
func FromEnv ¶
FromEnv reads BASE_* env vars. When BASE_NETWORK is empty or "standalone" it returns a no-op implementation with Enabled() == false. On validation error the error is returned so the caller can fail fast.
type NodeID ¶
type NodeID string
NodeID is a stable member identity — in k8s the pod DNS name, in compose the service DNS name, in tests a synthetic string.
type Shard ¶
type Shard struct {
ID string
Members []NodeID
Threshold int
// contains filtered or unexported fields
}
Shard owns the per-shard Quasar engine, the apply loop, and the dedup cache that makes apply idempotent across restarts.
Lifecycle: created lazily on first reference by shard(id), closed by the node on Stop(). One Shard per shardID per node.
type TLSConfig ¶
type TLSConfig struct {
// CACerts are the PEM-encoded CA certificates the transport trusts
// for peer certs. Supplied by the operator from the Base-network
// Secret (KMS-issued) or the self-signed dev CA.
CACerts []byte
// ServerCert / ServerKey are the local pod's TLS identity. Both
// PEM-encoded. Production: issued by KMS; dev: self-signed.
ServerCert []byte
ServerKey []byte
// ClientCert / ClientKey are optional — when the transport opens
// outbound connections it presents these. Typically the same as
// the server cert (pod identity is one cert per pod).
ClientCert []byte
ClientKey []byte
// AllowedSANs is the SAN allowlist: the pod DNS names from
// BASE_PEERS. Any peer cert whose DNSNames slice does not include
// one of these is rejected. Empty slice + non-nil CACerts = "trust
// any cert chained to CA" (dev-only fallback).
AllowedSANs []string
}
TLSConfig is the transport-level mTLS settings plumbed through Config. Zero values mean "no TLS"; production callers MUST set CACerts + at least one of (ServerCert, ClientCert) to activate mTLS.
func (*TLSConfig) ClientConfig ¶
ClientConfig builds a *tls.Config for outbound connections. Same VerifyPeerCertificate hook — pinning works symmetrically.
type Transport ¶
type Transport interface {
// Start begins delivering inbound envelopes to recv. It must be
// idempotent-on-stop (Stop may be called after Start errors).
Start(ctx context.Context, recv func(Envelope)) error
// Publish fans out env to every peer in the local node's router view.
// Best-effort: peers discover missed frames via Quasar's DAG sync on
// reconnect. Implementations should not block for slow peers.
Publish(env Envelope) error
// Stop releases transport resources. Safe to call multiple times.
Stop(ctx context.Context) error
}
Transport is the peer-to-peer plane. Production implementations carry envelopes over QUIC/gRPC with mTLS + PQ-identity. The in-process test transport ships envelopes through Go channels so the integration test runs with no sockets and no Docker.