network

package
v0.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package network archive layer.

Archive writes quasar-finalised WAL frames to object storage (S3 or GCS) as per-shard segment files. Each segment is a length-prefixed list of PQ-signed frames that can be replayed for point-in-time recovery.

Segment format: magic "LBN2" (authenticated — body+crc+pubkey signed by Ed25519). Version 1 ("LBN1") existed only during dev and is rejected unconditionally by readers to prevent downgrade attacks. Future format changes MUST bump the magic to "LBN3"/... and version-detect on read; never overload an existing magic — forwards compatibility only.

This file defines the Archive interface and the URL-based backend dispatcher. The consensus-side wiring (witness validator, frame feed) is owned by the core network package; archive is a dumb consumer of already-finalised data.

Package network replicates Base SQLite commits across peers via the luxfi/consensus Quasar engine. One shard = one Quasar engine = one DAG.

See docs/NETWORK.md for the full design. Contract:

  • Enabled() — FromEnv toggles off when BASE_NETWORK is unset.
  • Start/Stop — lifecycle for all shard engines and transport.
  • InstallWALHook — per-connection commit hook; captures frames, submits to the shard engine.
  • WriterFor — which pod owns shardID (consistent hash).
  • MembersFor — replica set for shardID.

Index

Constants

View Source
const (
	DefaultSegmentTargetBytes = 8 * 1024 * 1024
	DefaultFlushInterval      = 10 * time.Second
	DefaultRetryDeadline      = 5 * time.Minute
	DefaultBacklogMaxBytes    = 64 * 1024 * 1024
	DefaultBacklogMaxSegments = 100_000
)

Defaults applied when a field is zero. R6: BacklogMax* cap per-shard in-memory backlog so a hostile / broken storage backend cannot OOM the process. 64 MiB / 100 000 segments is "enough headroom for a real S3 outage" but "small enough that even 32 shards on a 2 GiB pod can't OOM". Operators override via `BASE_SHARD_BACKLOG_MAX` and `BASE_SHARD_BACKLOG_SEGMENTS`.

Variables

View Source
var ErrSegmentCorrupt = errors.New("segment: corrupt")

ErrSegmentCorrupt is returned when a segment fails magic, CRC, or signature validation on read.

View Source
var ErrSegmentUnsigned = errors.New("segment: no verifier configured")

ErrSegmentUnsigned is returned when a reader has no verifier (nil) or a writer has no signer (nil). Fail-closed — never decode/encode an unauthenticated segment.

View Source
var ErrTLSUnpinnedPeer = errors.New("transport: peer SAN not in BASE_PEERS allowlist")

ErrTLSUnpinnedPeer is returned by the verifier when a peer's leaf cert chains to the CA but does not present a SAN listed in AllowedSANs. This is the R5 defence — a compromised neighbor pod with a valid but unexpected identity cannot impersonate the real peers.

Functions

func NewDNSMembership added in v0.49.0

func NewDNSMembership(ctx context.Context, self string, seeds []string) *dnsMembership

NewDNSMembership builds a DNS-based Membership. seeds are BASE_PEERS entries as "host" or "host:port" — port is required so the returned NodeIDs are addressable. A seed whose host already resolves to a single literal IP is used as-is.

The initial Members() snapshot contains self, blocking while the first round of DNS resolution completes (bounded by a 2s context). Subsequent refreshes are fully async.

func NewStaticMembership added in v0.49.0

func NewStaticMembership(self string, members []string) *staticMembership

NewStaticMembership returns a Membership with a fixed member set. self is automatically included if not already present.

func SetWALSource

func SetWALSource(n Network, src walSource)

SetWALSource installs a capture function used by all future InstallWALHook calls. When unset, frames carry an empty payload (useful for exercising the consensus plane in isolation, e.g. integration tests).

Types

type ApplyFunc

type ApplyFunc func(Frame) error

ApplyFunc is called once per finalised frame per shard. Errors are counted on the metrics and do not retry — Quasar has already finalised the frame, so the caller owns recovery.

type ApplyKey

type ApplyKey [48]byte

ApplyKey is the (salt, cksm) pair used to dedupe applies.

type Archive

type Archive interface {
	Append(ctx context.Context, shardID string, seq uint64, frame []byte) error
	Range(ctx context.Context, shardID string, fromSeq, toSeq uint64) (iter.Seq2[Frame, error], error)
	Close() error
}

Archive is the write+replay interface against object storage.

Append is called per finalised frame by the witness validator loop. It MUST buffer and return quickly; flushing to storage happens asynchronously. A slow backend must back-pressure via metrics, not by blocking Append indefinitely.

Range yields frames in seq order across segments overlapping [fromSeq, toSeq]. Iteration stops at the first error yielded.

func NewArchive

func NewArchive(ctx context.Context, cfg ArchiveConfig, svc string, m *ArchiveMetrics) (Archive, error)

NewArchive dispatches on the URL scheme. Currently supported: s3:// → MinIO-protocol S3 (hanzoai/s3 self-hosted or AWS). gs:// → Google Cloud Storage. off → nil Archive, disabled (call sites treat as no-op). Anything else is a config error.

type ArchiveConfig

type ArchiveConfig struct {
	// URL is the backend destination: s3://bucket/prefix or
	// gs://bucket/prefix. Prefix is optional; if absent, objects land
	// at the bucket root.
	URL string

	// SegmentTargetBytes is the size at which an in-memory segment
	// flushes. Defaults to 8 MiB.
	SegmentTargetBytes int

	// FlushInterval is the max time a partial segment may linger in
	// memory before being flushed. Defaults to 10 s.
	FlushInterval time.Duration

	// RetryDeadline bounds transient retry attempts for a single
	// segment flush. Defaults to 5 minutes. After it elapses, the
	// segment is retained for the next flush cycle (never dropped).
	RetryDeadline time.Duration

	// SigningKey is the Ed25519 private key used to sign every segment
	// we write. Production callers MUST inject a KMS-held key; when
	// unset (tests, dev), the archive writer auto-generates a transient
	// key — fine in-process, useless across restarts.
	SigningKey ed25519.PrivateKey

	// TrustedSegmentKeys are Ed25519 public keys whose segments the
	// reader accepts. The local SigningKey's public half is always
	// trusted; additional keys cover rotation (PITR against archives
	// written under a now-retired key).
	TrustedSegmentKeys []ed25519.PublicKey

	// BacklogMaxBytes caps the per-shard in-memory backlog of encoded
	// segments waiting to upload. Once exceeded, the oldest segments
	// are dropped (counted on IncDrops). Zero = unbounded (dev only);
	// production callers MUST set this to a sane ceiling.
	BacklogMaxBytes int

	// BacklogMaxSegments caps the per-shard backlog by segment count.
	// Either limit firing triggers the drop path. Zero = unbounded.
	BacklogMaxSegments int
}

ArchiveConfig controls segment size + flush cadence. Zero values fall back to defaults (8 MiB, 10 s).

type ArchiveMetrics

type ArchiveMetrics struct {
	// SetLagBytes reports the total size of buffered, not-yet-flushed
	// frames across all shards. Surfaced as base_archive_lag_bytes.
	SetLagBytes func(bytes int64)
	// IncFailures increments a failures counter whenever a segment
	// exhausts its retry deadline. The segment is re-buffered unless
	// the backlog cap is exceeded.
	IncFailures func()
	// IncDrops increments a drop counter whenever the backlog cap
	// sheds the oldest segment. Liveness-over-durability — operators
	// should page on this.
	IncDrops func()
}

ArchiveMetrics is a callback-based view onto the core Metrics collectors declared in metrics.go. Archive code stays decoupled from the Prometheus registry by going through function hooks — callers bind SetLagBytes to Metrics.ArchiveLagBytes.Set and IncFailures to a counter of their choice. A nil ArchiveMetrics is valid; both hooks become no-ops.

func BindArchiveMetrics

func BindArchiveMetrics(m *Metrics) *ArchiveMetrics

BindArchiveMetrics builds an ArchiveMetrics that writes through to the core Metrics collectors. Callers pass the returned value into NewArchive. Nil-safe: a nil Metrics yields a nil-out ArchiveMetrics.

type Config

type Config struct {
	// Enabled is true iff BASE_NETWORK=quasar.
	Enabled bool

	// ShardKey names the identity carried by requests (JWT claim or header).
	// Required when Enabled.
	ShardKey string

	// Replication is the number of members holding each shard. 1 = standalone
	// DAG (durability via archive), 2 = pair, 3+ = Byzantine-safe quorum.
	Replication int

	// Peers is the static seed list. In k8s these are pod-ordinal DNS names
	// emitted by the operator; in compose they are static service names.
	// host:port form, p2p port not HTTP port.
	Peers []string

	// NodeID is the local member identity. Defaults to $HOSTNAME; overridable
	// via BASE_NODE_ID for tests and compose.
	NodeID string

	// Role is "validator" (default) or "archive". Archive nodes don't vote;
	// they subscribe to finalized frames and append to cold storage.
	Role NodeRole

	// Archive is the cold-storage URL or "off". s3://bucket/prefix,
	// gs://bucket/prefix, file://path (dev), off.
	Archive string

	// ListenHTTP is the Base HTTP listen address. Used only for the
	// /-/base/members endpoint by the Gateway; main HTTP comes from core.
	ListenHTTP string

	// ListenP2P is the Quasar peer-to-peer port.
	ListenP2P string

	// TLS is the mTLS config for the Quasar p2p transport (R5). When
	// unset the transport runs without authentication — only acceptable
	// in single-node dev and tests. Production callers inject certs via
	// the KMS plugin (base/plugins/kms) or the operator-emitted dev CA.
	// See transport_tls.go for the pinning rules.
	TLS *TLSConfig
}

Config is the parsed env surface for a Base network member. All fields are immutable after construction; reparsing requires a restart (standard for pod-lifetime config).

func ConfigFromEnv

func ConfigFromEnv() (Config, error)

ConfigFromEnv reads BASE_NETWORK, BASE_SHARD_KEY, BASE_REPLICATION, BASE_PEERS, BASE_NODE_ROLE, BASE_ARCHIVE, BASE_LISTEN_HTTP, BASE_LISTEN_P2P, and BASE_SHARD_BACKLOG_MAX / BASE_SHARD_BACKLOG_SEGMENTS (R6 per-shard backlog caps — the archive config is built separately from these by base/core's startup path). Standalone defaults are safe: no error, Enabled==false.

type Envelope

type Envelope struct {
	ShardID string
	Frame   Frame
}

Envelope is what the transport ships between nodes: a shardID header + the serialisable frame. Signing and transport security live at the transport layer; this struct is the minimal contract.

type Frame

type Frame struct {
	ShardID   string
	Seq       uint64
	PrevSeq   uint64
	Timestamp int64
	Salt      [16]byte
	Payload   []byte
	Cksm      [32]byte
	Bytes     []byte
}

Frame is the unit of replication. It is the serialised form of one committed SQLite transaction for a shard: opaque payload bytes (the WAL segment delta), a monotonic sequence, a random salt, and a checksum.

Frames are content-addressed by (salt, cksm) so that:

  • duplicate submissions coalesce in the Quasar DAG,
  • the apply callback is trivially idempotent on pod restart,
  • any tamper on the wire is caught before apply.

Bytes is the serialised on-wire form (encode output). It is populated lazily by decodeFrame and by the archive layer so that frame replay keeps the original PQ signature intact; local producers use encode() directly.

func (Frame) ApplyKey

func (f Frame) ApplyKey() ApplyKey

ApplyKey returns the idempotency key for this frame.

func (Frame) Valid

func (f Frame) Valid() error

Valid returns nil iff the stored checksum matches a freshly computed one. Callers check this before applying a frame that came off the wire.

type HookRegisterer

type HookRegisterer interface {
	RegisterCommitHook(func() int32)
}

HookRegisterer is the narrow surface we need from a SQLite driver connection: the method modernc.org/sqlite exposes on its *conn. Accepting it through a tiny interface keeps this package decoupled from the concrete driver type and test-friendly.

type Membership added in v0.49.0

type Membership interface {
	// Members returns the current sorted member list. Includes self.
	// Never nil; a singleton returns a 1-element slice.
	Members() []NodeID

	// Watch returns a channel that emits the member list on every
	// change. The first value is delivered immediately on subscribe.
	// The channel closes when ctx is cancelled.
	Watch(ctx context.Context) <-chan []NodeID

	// Close releases any resources. Safe to call multiple times.
	Close() error
}

Membership is the live view of reachable Base instances. Consumers read a snapshot via Members(), or subscribe to change notifications via Watch(). The self NodeID is always included in the snapshot.

type Metrics

type Metrics struct {
	Shards           prometheus.Counter
	ActiveShards     prometheus.Gauge
	WALLagBytes      prometheus.Gauge
	HotShards        prometheus.Gauge
	LeaseContentions prometheus.Counter
	ArchiveLagBytes  prometheus.Gauge
	// ArchiveFailures counts segment flushes that exhausted their
	// retry deadline. Incremented once per failure regardless of
	// re-buffer outcome.
	ArchiveFailures prometheus.Counter
	// ArchiveDrops counts segments shed by the per-shard backlog cap
	// (R6). Under a healthy backend this stays at zero; non-zero is a
	// pageable condition — we're losing archive frames to keep the pod
	// alive.
	ArchiveDrops prometheus.Counter

	FramesSubmitted prometheus.Counter
	FramesIngested  prometheus.Counter
	FramesFinalized prometheus.Counter
	FramesDuplicate prometheus.Counter
	FramesInvalid   prometheus.Counter
	// FramesRejectedShardMismatch counts frames dropped because the
	// Envelope's routed shard disagreed with the Frame's internal
	// shardID (R1 probe). Under healthy operation this stays at zero.
	FramesRejectedShardMismatch prometheus.Counter
	// FramesRejectedSeqGap counts frames whose height != prevHeight+1
	// (R2). The apply path never bumps localSeq from these.
	FramesRejectedSeqGap prometheus.Counter

	ApplyErrors   prometheus.Counter
	WALHookErrors prometheus.Counter
	WALBytes      prometheus.Counter

	// MembershipSize is the live member count as reported by the
	// Membership watcher. Dashboards alert on unexpected drops
	// (scale-down events show up here before the transport notices).
	MembershipSize prometheus.Gauge
}

Metrics owns every collector the network package exposes. Callers wire it into their own registry; we never touch prometheus.DefaultRegisterer.

The metric families correspond directly to the design doc:

base_shards                counter  shards assigned to this member
base_active_shards         gauge    shards with a live Quasar engine
base_wal_lag_bytes         gauge    bytes pending archive flush (set via Append)
base_hot_shards            gauge    shards above activity threshold (set by scaler)
base_lease_contentions     counter  times a writer lost a lease race
base_archive_lag_bytes     gauge    mirror of wal_lag for Prom naming parity

Plus per-path counters used by the internal wiring:

base_network_frames_submitted_total
base_network_frames_ingested_total
base_network_frames_finalized_total
base_network_frames_duplicate_total
base_network_frames_invalid_total
base_network_apply_errors_total
base_network_wal_hook_errors_total
base_network_wal_bytes_total

func NewMetrics

func NewMetrics() *Metrics

NewMetrics constructs the collectors. Register them on a caller-owned registry with Register(). Calling NewMetrics twice produces two independent sets — useful for test isolation.

func (*Metrics) Register

func (m *Metrics) Register(r prometheus.Registerer) error

Register adds every collector in m to r. Duplicate registration returns the first error; callers that need per-metric control can register one field at a time.

type Network

type Network interface {
	Enabled() bool

	Start(ctx context.Context) error
	Stop(ctx context.Context) error

	// InstallWALHook wires the commit hook on a raw SQLite driver connection
	// for the given shard. The hook captures the committed frame, signs it,
	// and submits it to the shard's Quasar engine. Apply-on-finalize is
	// idempotent by (salt, cksm).
	InstallWALHook(rawConn any, shardID string) error

	// WriterFor returns the current writer endpoint for shardID and whether
	// it is the local pod. Gateway uses this for request routing.
	WriterFor(shardID string) (endpoint string, local bool)

	// MembersFor returns all replica endpoints for shardID.
	MembersFor(shardID string) []string

	// Metrics exposes the Prom collectors. Callers register via their own
	// registry; we never touch the global default.
	Metrics() *Metrics
}

Network is the public API Base uses. Standalone mode returns a no-op implementation from FromEnv; all methods are safe to call.

func FromEnv

func FromEnv() (Network, error)

FromEnv reads BASE_* env vars. When BASE_NETWORK is empty or "standalone" it returns a no-op implementation with Enabled() == false. On validation error the error is returned so the caller can fail fast.

func New

func New(cfg Config) (Network, error)

New constructs a Network from an already-validated Config. Callers who build a Config by hand use this; most callers use FromEnv.

Singleton (len(Peers) == 0) collapses to the standalone no-op path. A one-pod workload does not need consensus, does not need ZAP, does not start a listener, and does not self-dial. Scaling up to N>1 flips the same binary into a full network node by adding peers.

func NewWithTransport

func NewWithTransport(cfg Config, t Transport) (Network, error)

NewWithTransport constructs a Network with a caller-supplied transport. In-process tests inject a memory transport here; production callers use the default wired by New.

type NodeID

type NodeID string

NodeID is a stable member identity — in k8s the pod DNS name, in compose the service DNS name, in tests a synthetic string.

type NodeRole

type NodeRole string

NodeRole distinguishes voters from witnesses.

const (
	RoleValidator NodeRole = "validator"
	RoleArchive   NodeRole = "archive"
)

type Resolver added in v0.49.0

type Resolver interface {
	LookupHost(ctx context.Context, host string) ([]string, error)
}

Resolver turns a seed entry into a set of addresses. The default implementation is `net.Resolver{}.LookupHost`; tests inject a predictable map.

type Shard

type Shard struct {
	ID        string
	Members   []NodeID
	Threshold int
	// contains filtered or unexported fields
}

Shard owns the per-shard Quasar engine, the apply loop, and the dedup cache that makes apply idempotent across restarts.

Lifecycle: created lazily on first reference by shard(id), closed by the node on Stop(). One Shard per shardID per node.

func (*Shard) LocalSeq

func (s *Shard) LocalSeq() uint64

LocalSeq returns the highest finalised seq this shard has applied locally. Gateways compare this against the client's txseq cookie.

type TLSConfig

type TLSConfig struct {
	// CACerts are the PEM-encoded CA certificates the transport trusts
	// for peer certs. Supplied by the operator from the Base-network
	// Secret (KMS-issued) or the self-signed dev CA.
	CACerts []byte

	// ServerCert / ServerKey are the local pod's TLS identity. Both
	// PEM-encoded. Production: issued by KMS; dev: self-signed.
	ServerCert []byte
	ServerKey  []byte

	// ClientCert / ClientKey are optional — when the transport opens
	// outbound connections it presents these. Typically the same as
	// the server cert (pod identity is one cert per pod).
	ClientCert []byte
	ClientKey  []byte

	// AllowedSANs is the SAN allowlist: the pod DNS names from
	// BASE_PEERS. Any peer cert whose DNSNames slice does not include
	// one of these is rejected. Empty slice + non-nil CACerts = "trust
	// any cert chained to CA" (dev-only fallback).
	AllowedSANs []string
}

TLSConfig is the transport-level mTLS settings plumbed through Config. Zero values mean "no TLS"; production callers MUST set CACerts + at least one of (ServerCert, ClientCert) to activate mTLS.

func (*TLSConfig) ClientConfig

func (c *TLSConfig) ClientConfig() (*tls.Config, error)

ClientConfig builds a *tls.Config for outbound connections. Same VerifyPeerCertificate hook — pinning works symmetrically.

func (*TLSConfig) Enabled

func (c *TLSConfig) Enabled() bool

Enabled reports whether this config activates TLS. Nil-safe.

func (*TLSConfig) ServerConfig

func (c *TLSConfig) ServerConfig() (*tls.Config, error)

ServerConfig builds a *tls.Config suitable for an mTLS listener. The VerifyPeerCertificate hook rejects any peer whose SAN is not in the allowlist even when the chain verifies.

type Transport

type Transport interface {
	// Start begins delivering inbound envelopes to recv. It must be
	// idempotent-on-stop (Stop may be called after Start errors).
	Start(ctx context.Context, recv func(Envelope)) error

	// Publish fans out env to every peer in the local node's router view.
	// Best-effort: peers discover missed frames via Quasar's DAG sync on
	// reconnect. Implementations should not block for slow peers.
	Publish(env Envelope) error

	// Stop releases transport resources. Safe to call multiple times.
	Stop(ctx context.Context) error
}

Transport is the peer-to-peer plane. Production implementations carry envelopes over QUIC/gRPC with mTLS + PQ-identity. The in-process test transport ships envelopes through Go channels so the integration test runs with no sockets and no Docker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL