wal

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package wal implements the write-ahead log (WAL) and persistence lifecycle for the registry server. It is extracted from pkg/registry/server as part of the registry monolith decomposition (R6.1).

The WAL closes the data-loss window between snapshot saves. flushSave runs on saveLoopInterval (5s); a crash between saves would otherwise drop every mutation in that window. Each low-frequency mutation records a delta to the WAL synchronously. On startup the snapshot is loaded, then any post-snapshot WAL entries are replayed on top.

Index

Constants

View Source
const MaxWALSize = maxWALSize

MaxWALSize is the upper bound enforced by Append. Exposed for tests.

Variables

View Source
var ErrWALFull = errors.New("WAL: at size cap, refusing append")

ErrWALFull is returned by Append when the WAL would exceed maxWALSize. Callers should log + drop the entry (best-effort durability); the in-memory mutation still completed, and the next successful flushSave will persist via the snapshot path and truncate the WAL.

Functions

This section is empty.

Types

type DeltaEntry

type DeltaEntry struct {
	SeqNo  uint64          `json:"seq_no"`
	Type   DeltaType       `json:"type"`
	NodeID uint32          `json:"node_id,omitempty"`
	Data   json.RawMessage `json:"data,omitempty"`
}

DeltaEntry records a single state mutation for WAL durability.

type DeltaType

type DeltaType uint8

DeltaType identifies what kind of mutation a delta represents.

const (
	DeltaRegister      DeltaType = 1
	DeltaDeregister    DeltaType = 2
	DeltaHeartbeat     DeltaType = 3
	DeltaTrustAdd      DeltaType = 4
	DeltaTrustRevoke   DeltaType = 5
	DeltaVisibility    DeltaType = 6
	DeltaHostname      DeltaType = 7
	DeltaTags          DeltaType = 8
	DeltaNetworkCreate DeltaType = 9
	DeltaNetworkJoin   DeltaType = 10
	DeltaNetworkLeave  DeltaType = 11
	DeltaKeyRotation   DeltaType = 12
	DeltaTaskExec      DeltaType = 13
	// DeltaNetworkDelete added for WAL wiring. On older binaries that
	// don't recognize this type, replay logs and skips — safe forward-compat.
	DeltaNetworkDelete DeltaType = 14
)

type DeregisterDelta

type DeregisterDelta struct {
	NodeID uint32 `json:"node_id"`
}

DeregisterDelta marks a node as removed.

type KeyRotationDelta

type KeyRotationDelta struct {
	NodeID       uint32 `json:"node_id"`
	NewPublicKey string `json:"new_public_key"` // base64
	RotatedAt    string `json:"rotated_at"`     // RFC3339
}

KeyRotationDelta records a key rotation so the new pubkey survives a crash.

type NetworkCreateDelta

type NetworkCreateDelta struct {
	NetworkID     uint16 `json:"network_id"`
	Name          string `json:"name"`
	JoinRule      string `json:"join_rule"`
	Token         string `json:"token,omitempty"`
	AdminToken    string `json:"admin_token,omitempty"`
	Enterprise    bool   `json:"enterprise,omitempty"`
	CreatorNodeID uint32 `json:"creator_node_id,omitempty"`
	CreatedAt     string `json:"created_at"` // RFC3339
}

NetworkCreateDelta captures the parameters of a freshly-created network. Member set is empty at creation; the creator is added by a separate join delta if applicable.

type NetworkDeleteDelta

type NetworkDeleteDelta struct {
	NetworkID uint16 `json:"network_id"`
}

NetworkDeleteDelta marks a network as removed.

type NetworkMembershipDelta

type NetworkMembershipDelta struct {
	NodeID    uint32 `json:"node_id"`
	NetworkID uint16 `json:"network_id"`
}

NetworkMembershipDelta covers join AND leave; direction encoded in the DeltaType (DeltaNetworkJoin / DeltaNetworkLeave).

type RegisterDelta

type RegisterDelta struct {
	NodeID    uint32   `json:"node_id"`
	Owner     string   `json:"owner,omitempty"`
	PublicKey string   `json:"public_key"` // base64
	RealAddr  string   `json:"real_addr,omitempty"`
	Hostname  string   `json:"hostname,omitempty"`
	LANAddrs  []string `json:"lan_addrs,omitempty"`
	Version   string   `json:"version,omitempty"`
	CreatedAt string   `json:"created_at,omitempty"` // RFC3339
}

registerDelta carries the full state needed to recreate a new-node registration. Captured at the new-node assignment site (handleRegister) so a crash before flushSave doesn't reassign the same identity to a different node_id.

type Snapshot

type Snapshot struct {
	Version            int                                         `json:"version"`
	NextNode           uint32                                      `json:"next_node"`
	NextNet            uint16                                      `json:"next_net"`
	Nodes              map[string]*SnapshotNode                    `json:"nodes"`
	Networks           map[string]*SnapshotNet                     `json:"networks"`
	TrustPairs         []string                                    `json:"trust_pairs,omitempty"`
	PubKeyIdx          map[string]uint32                           `json:"pub_key_idx,omitempty"`
	HandshakeInbox     map[string][]*trustpkg.HandshakeRelayMsg    `json:"handshake_inbox,omitempty"`
	HandshakeResponses map[string][]*trustpkg.HandshakeResponseMsg `json:"handshake_responses,omitempty"`
	InviteInbox        map[string][]*membpkg.NetworkInvite         `json:"invite_inbox,omitempty"`
	// Dashboard stats persistence (explicit counters for validation)
	TotalRequests int64  `json:"total_requests,omitempty"`
	TotalNodes    int    `json:"total_nodes,omitempty"`
	OnlineNodes   int    `json:"online_nodes,omitempty"`
	TrustLinks    int    `json:"trust_links,omitempty"`
	UniqueTags    int    `json:"unique_tags,omitempty"`
	TaskExecutors int    `json:"task_executors,omitempty"`
	StartTime     string `json:"start_time,omitempty"` // RFC3339 format
	// Restart events: unix-millis of each process start after first. Lets the
	// dashboard show brief redeploy "blips" while preserving cumulative uptime.
	RestartEvents     []int64                        `json:"restart_events,omitempty"`
	DowntimeIntervals [][2]int64                     `json:"downtime_intervals,omitempty"`
	LastHeartbeat     int64                          `json:"last_heartbeat,omitempty"`
	ProbeStates       map[string]*dashpkg.ProbeState `json:"probe_states,omitempty"`
	// Time-series history for dashboard charts
	HourlyHistory    []dashpkg.StatsSample                   `json:"hourly_history,omitempty"`
	DailyHistory     []dashpkg.StatsSample                   `json:"daily_history,omitempty"`
	NetHourlyHistory map[string][]dashpkg.NetworkSampleEntry `json:"net_hourly_history,omitempty"`
	NetDailyHistory  map[string][]dashpkg.NetworkSampleEntry `json:"net_daily_history,omitempty"`
	// Audit log persistence (most recent entries, capped at maxAuditEntries)
	AuditLog []auditpkg.Entry `json:"audit_log,omitempty"`
	// Enterprise config persistence
	IDPConfig      *wire.BlueprintIdentityProvider `json:"idp_config,omitempty"`
	AuditExportCfg *wire.BlueprintAuditExport      `json:"audit_export_config,omitempty"`
	RBACPreAssign  map[string][]wire.BlueprintRole `json:"rbac_pre_assign,omitempty"` // networkID -> roles
	// Integrity: SHA256 hex digest of all fields except Checksum
	Checksum string `json:"checksum,omitempty"`
}

Snapshot is the JSON-serializable full registry state written by flushSave and read by load. Version 0 is the legacy (pre-checksum) format; version 1 includes the Checksum field for integrity verification.

type SnapshotNet

type SnapshotNet struct {
	ID           uint16                 `json:"id"`
	Name         string                 `json:"name"`
	JoinRule     string                 `json:"join_rule"`
	Token        string                 `json:"token,omitempty"`
	Members      []uint32               `json:"members"`
	MemberRoles  map[string]string      `json:"member_roles,omitempty"` // nodeID -> role
	MemberTags   map[string][]string    `json:"member_tags,omitempty"`  // nodeID -> admin-assigned tags
	AdminToken   string                 `json:"admin_token,omitempty"`  // per-network admin token
	Policy       *membpkg.NetworkPolicy `json:"policy,omitempty"`       // network policy
	Rules        *wire.NetworkRules     `json:"rules,omitempty"`        // managed network rules
	ExprPolicy   json.RawMessage        `json:"expr_policy,omitempty"`  // programmable policy engine document
	Enterprise   bool                   `json:"enterprise,omitempty"`   // enterprise network flag
	RequestCount int64                  `json:"request_count,omitempty"`
	Created      string                 `json:"created"`
}

SnapshotNet is the JSON-serializable form of a single registry network.

type SnapshotNode

type SnapshotNode struct {
	ID          uint32   `json:"id"`
	Owner       string   `json:"owner,omitempty"`
	PublicKey   string   `json:"public_key"`
	RealAddr    string   `json:"real_addr,omitempty"`
	Networks    []uint16 `json:"networks"`
	Public      bool     `json:"public,omitempty"`
	LastSeen    string   `json:"last_seen,omitempty"`
	Hostname    string   `json:"hostname,omitempty"`
	Tags        []string `json:"tags,omitempty"`
	TaskExec    bool     `json:"task_exec,omitempty"`
	LANAddrs    []string `json:"lan_addrs,omitempty"`
	KeyCreated  string   `json:"key_created,omitempty"`
	KeyRotated  string   `json:"key_rotated,omitempty"`
	KeyRotCount int      `json:"key_rot_count,omitempty"`
	KeyExpires  string   `json:"key_expires,omitempty"`
	ExternalID  string   `json:"external_id,omitempty"`
	Version     string   `json:"version,omitempty"`
	RelayOnly   bool     `json:"relay_only,omitempty"` // preserve flag across snapshots
}

SnapshotNode is the JSON-serializable form of a single registry node. All binary/time fields are encoded as strings (base64 / RFC3339) so the snapshot is human-readable and forward-compatible.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store manages the WAL file, save channels, standby flag, and replication token on behalf of the registry Server. It runs saveLoop and replicaPushLoop as background goroutines started by Start.

func NewStore

func NewStore(walPath string, done <-chan struct{}, cbs StoreCallbacks) (*Store, error)

NewStore creates a Store. The WAL is opened at walPath (empty = no WAL). done is the server shutdown channel; callbacks wire the server-specific persistence and replication operations.

func (*Store) Close

func (st *Store) Close() error

Close closes the underlying WAL file. The caller must wait for saveLoop and replicaPushLoop to exit (via SaveDone / ReplicaPushDone) before calling Close.

func (*Store) IsStandby

func (st *Store) IsStandby() bool

IsStandby returns true if standby mode is active.

func (*Store) ReplicaPushCh

func (st *Store) ReplicaPushCh() chan struct{}

ReplicaPushCh returns the replica-push signal channel.

func (*Store) ReplicaPushDone

func (st *Store) ReplicaPushDone() <-chan struct{}

ReplicaPushDone returns the channel closed when replicaPushLoop exits.

func (*Store) ReplicationToken

func (st *Store) ReplicationToken() string

ReplicationToken returns the current replication token.

func (*Store) SaveCh

func (st *Store) SaveCh() chan struct{}

SaveCh returns the save signal channel (write to trigger a debounced save).

func (*Store) SaveDone

func (st *Store) SaveDone() <-chan struct{}

SaveDone returns the channel closed when saveLoop exits.

func (*Store) SetReplicationToken

func (st *Store) SetReplicationToken(token string)

SetReplicationToken sets the token required for subscribe_replication.

func (*Store) SetStandby

func (st *Store) SetStandby(v bool)

SetStandby configures standby mode. In standby mode the server rejects writes and instead receives snapshots from a primary.

func (*Store) Start

func (st *Store) Start()

Start launches saveLoop and replicaPushLoop as goroutines. It must be called exactly once after NewStore.

func (*Store) TriggerSave

func (st *Store) TriggerSave()

TriggerSave signals both the save loop and replica-push loop.

func (*Store) TriggerSnapshot

func (st *Store) TriggerSnapshot() error

TriggerSnapshot immediately runs a full snapshot flush. Returns nil if no storePath is configured (i.e. WAL is nil).

func (*Store) WAL

func (st *Store) WAL() *WAL

WAL returns the underlying WAL file. Nil if no persistence is configured.

type StoreCallbacks

type StoreCallbacks struct {
	// FlushSave persists the current server state to disk. Called by saveLoop.
	FlushSave func() error
	// SnapshotJSON builds the current state as JSON bytes for replication.
	// Returns nil on error (push is skipped).
	SnapshotJSON func() []byte
	// Push sends snapshot bytes to all replication subscribers.
	Push func([]byte)
	// IncSaveFailures increments the save-failure metric counter.
	IncSaveFailures func()
}

StoreCallbacks bundles the server-side callbacks that Store needs without creating a circular import between the wal and server packages.

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL implements an append-only write-ahead log for registry mutations. Instead of serializing the entire state on every mutation (O(N) per save), the WAL appends only the delta entry (O(1) per mutation). Full snapshots are written periodically (compaction) and the WAL is truncated.

On-disk format: sequential records of [4-byte little-endian length][delta entry JSON]. The WAL file path is derived from the snapshot path: "{storePath}.wal".

func NewWAL

func NewWAL(path string) (*WAL, error)

NewWAL opens or creates a WAL file at the given path. Returns nil if path is empty (no persistence configured).

func (*WAL) Append

func (w *WAL) Append(entry DeltaEntry) error

Append writes a delta entry to the WAL. The entry is fsync'd to ensure durability. Returns an error if the write fails.

func (*WAL) Close

func (w *WAL) Close() error

Close closes the WAL file.

func (*WAL) Path

func (w *WAL) Path() string

Path returns the underlying file path. Exposed for tests.

func (*WAL) Replay

func (w *WAL) Replay(fn func(DeltaEntry) error) (int, error)

Replay reads all entries from the WAL and calls fn for each. Used during startup to replay mutations that occurred after the last snapshot.

func (*WAL) SetSize

func (w *WAL) SetSize(v int64)

SetSize forces the in-memory size counter to v. Exposed for tests that need to simulate the size cap being reached without writing hundreds of MiB to disk.

func (*WAL) Size

func (w *WAL) Size() int64

Size returns the current WAL file size in bytes.

func (*WAL) Truncate

func (w *WAL) Truncate() error

Truncate clears the WAL file (called after a successful full snapshot). This is the "compaction" step — the snapshot supersedes all WAL entries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL