Documentation
¶
Overview ¶
Package wal implements the write-ahead log (WAL) and persistence lifecycle for the registry server. It is extracted from pkg/registry/server as part of the registry monolith decomposition (R6.1).
The WAL closes the data-loss window between snapshot saves. flushSave runs on saveLoopInterval (5s); a crash between saves would otherwise drop every mutation in that window. Each low-frequency mutation records a delta to the WAL synchronously. On startup the snapshot is loaded, then any post-snapshot WAL entries are replayed on top.
Index ¶
- Constants
- Variables
- type DeltaEntry
- type DeltaType
- type DeregisterDelta
- type KeyRotationDelta
- type NetworkCreateDelta
- type NetworkDeleteDelta
- type NetworkMembershipDelta
- type RegisterDelta
- type Snapshot
- type SnapshotNet
- type SnapshotNode
- type Store
- func (st *Store) Close() error
- func (st *Store) IsStandby() bool
- func (st *Store) ReplicaPushCh() chan struct{}
- func (st *Store) ReplicaPushDone() <-chan struct{}
- func (st *Store) ReplicationToken() string
- func (st *Store) SaveCh() chan struct{}
- func (st *Store) SaveDone() <-chan struct{}
- func (st *Store) SetReplicationToken(token string)
- func (st *Store) SetStandby(v bool)
- func (st *Store) Start()
- func (st *Store) TriggerSave()
- func (st *Store) TriggerSnapshot() error
- func (st *Store) WAL() *WAL
- type StoreCallbacks
- type WAL
Constants ¶
const MaxWALSize = maxWALSize
MaxWALSize is the upper bound enforced by Append. Exposed for tests.
Variables ¶
var ErrWALFull = errors.New("WAL: at size cap, refusing append")
ErrWALFull is returned by Append when the WAL would exceed maxWALSize. Callers should log + drop the entry (best-effort durability); the in-memory mutation still completed, and the next successful flushSave will persist via the snapshot path and truncate the WAL.
Functions ¶
This section is empty.
Types ¶
type DeltaEntry ¶
type DeltaEntry struct {
SeqNo uint64 `json:"seq_no"`
Type DeltaType `json:"type"`
NodeID uint32 `json:"node_id,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
DeltaEntry records a single state mutation for WAL durability.
type DeltaType ¶
type DeltaType uint8
DeltaType identifies what kind of mutation a delta represents.
const ( DeltaRegister DeltaType = 1 DeltaDeregister DeltaType = 2 DeltaHeartbeat DeltaType = 3 DeltaTrustAdd DeltaType = 4 DeltaTrustRevoke DeltaType = 5 DeltaVisibility DeltaType = 6 DeltaHostname DeltaType = 7 DeltaTags DeltaType = 8 DeltaNetworkCreate DeltaType = 9 DeltaNetworkJoin DeltaType = 10 DeltaNetworkLeave DeltaType = 11 DeltaKeyRotation DeltaType = 12 DeltaTaskExec DeltaType = 13 // DeltaNetworkDelete added for WAL wiring. On older binaries that // don't recognize this type, replay logs and skips — safe forward-compat. DeltaNetworkDelete DeltaType = 14 )
type DeregisterDelta ¶
type DeregisterDelta struct {
NodeID uint32 `json:"node_id"`
}
DeregisterDelta marks a node as removed.
type KeyRotationDelta ¶
type KeyRotationDelta struct {
NodeID uint32 `json:"node_id"`
NewPublicKey string `json:"new_public_key"` // base64
RotatedAt string `json:"rotated_at"` // RFC3339
}
KeyRotationDelta records a key rotation so the new pubkey survives a crash.
type NetworkCreateDelta ¶
type NetworkCreateDelta struct {
NetworkID uint16 `json:"network_id"`
Name string `json:"name"`
JoinRule string `json:"join_rule"`
Token string `json:"token,omitempty"`
AdminToken string `json:"admin_token,omitempty"`
Enterprise bool `json:"enterprise,omitempty"`
CreatorNodeID uint32 `json:"creator_node_id,omitempty"`
CreatedAt string `json:"created_at"` // RFC3339
}
NetworkCreateDelta captures the parameters of a freshly-created network. Member set is empty at creation; the creator is added by a separate join delta if applicable.
type NetworkDeleteDelta ¶
type NetworkDeleteDelta struct {
NetworkID uint16 `json:"network_id"`
}
NetworkDeleteDelta marks a network as removed.
type NetworkMembershipDelta ¶
type NetworkMembershipDelta struct {
NodeID uint32 `json:"node_id"`
NetworkID uint16 `json:"network_id"`
}
NetworkMembershipDelta covers join AND leave; direction encoded in the DeltaType (DeltaNetworkJoin / DeltaNetworkLeave).
type RegisterDelta ¶
type RegisterDelta struct {
NodeID uint32 `json:"node_id"`
Owner string `json:"owner,omitempty"`
PublicKey string `json:"public_key"` // base64
RealAddr string `json:"real_addr,omitempty"`
Hostname string `json:"hostname,omitempty"`
LANAddrs []string `json:"lan_addrs,omitempty"`
Version string `json:"version,omitempty"`
CreatedAt string `json:"created_at,omitempty"` // RFC3339
}
registerDelta carries the full state needed to recreate a new-node registration. Captured at the new-node assignment site (handleRegister) so a crash before flushSave doesn't reassign the same identity to a different node_id.
type Snapshot ¶
type Snapshot struct {
Version int `json:"version"`
NextNode uint32 `json:"next_node"`
NextNet uint16 `json:"next_net"`
Nodes map[string]*SnapshotNode `json:"nodes"`
Networks map[string]*SnapshotNet `json:"networks"`
TrustPairs []string `json:"trust_pairs,omitempty"`
PubKeyIdx map[string]uint32 `json:"pub_key_idx,omitempty"`
HandshakeInbox map[string][]*trustpkg.HandshakeRelayMsg `json:"handshake_inbox,omitempty"`
HandshakeResponses map[string][]*trustpkg.HandshakeResponseMsg `json:"handshake_responses,omitempty"`
InviteInbox map[string][]*membpkg.NetworkInvite `json:"invite_inbox,omitempty"`
// Dashboard stats persistence (explicit counters for validation)
TotalRequests int64 `json:"total_requests,omitempty"`
TotalNodes int `json:"total_nodes,omitempty"`
OnlineNodes int `json:"online_nodes,omitempty"`
TrustLinks int `json:"trust_links,omitempty"`
UniqueTags int `json:"unique_tags,omitempty"`
TaskExecutors int `json:"task_executors,omitempty"`
StartTime string `json:"start_time,omitempty"` // RFC3339 format
// Restart events: unix-millis of each process start after first. Lets the
// dashboard show brief redeploy "blips" while preserving cumulative uptime.
RestartEvents []int64 `json:"restart_events,omitempty"`
DowntimeIntervals [][2]int64 `json:"downtime_intervals,omitempty"`
LastHeartbeat int64 `json:"last_heartbeat,omitempty"`
ProbeStates map[string]*dashpkg.ProbeState `json:"probe_states,omitempty"`
// Time-series history for dashboard charts
HourlyHistory []dashpkg.StatsSample `json:"hourly_history,omitempty"`
DailyHistory []dashpkg.StatsSample `json:"daily_history,omitempty"`
NetHourlyHistory map[string][]dashpkg.NetworkSampleEntry `json:"net_hourly_history,omitempty"`
NetDailyHistory map[string][]dashpkg.NetworkSampleEntry `json:"net_daily_history,omitempty"`
// Audit log persistence (most recent entries, capped at maxAuditEntries)
AuditLog []auditpkg.Entry `json:"audit_log,omitempty"`
// Enterprise config persistence
IDPConfig *wire.BlueprintIdentityProvider `json:"idp_config,omitempty"`
AuditExportCfg *wire.BlueprintAuditExport `json:"audit_export_config,omitempty"`
RBACPreAssign map[string][]wire.BlueprintRole `json:"rbac_pre_assign,omitempty"` // networkID -> roles
// Integrity: SHA256 hex digest of all fields except Checksum
Checksum string `json:"checksum,omitempty"`
}
Snapshot is the JSON-serializable full registry state written by flushSave and read by load. Version 0 is the legacy (pre-checksum) format; version 1 includes the Checksum field for integrity verification.
type SnapshotNet ¶
type SnapshotNet struct {
ID uint16 `json:"id"`
Name string `json:"name"`
JoinRule string `json:"join_rule"`
Token string `json:"token,omitempty"`
Members []uint32 `json:"members"`
MemberRoles map[string]string `json:"member_roles,omitempty"` // nodeID -> role
MemberTags map[string][]string `json:"member_tags,omitempty"` // nodeID -> admin-assigned tags
AdminToken string `json:"admin_token,omitempty"` // per-network admin token
Policy *membpkg.NetworkPolicy `json:"policy,omitempty"` // network policy
Rules *wire.NetworkRules `json:"rules,omitempty"` // managed network rules
ExprPolicy json.RawMessage `json:"expr_policy,omitempty"` // programmable policy engine document
Enterprise bool `json:"enterprise,omitempty"` // enterprise network flag
RequestCount int64 `json:"request_count,omitempty"`
Created string `json:"created"`
}
SnapshotNet is the JSON-serializable form of a single registry network.
type SnapshotNode ¶
type SnapshotNode struct {
ID uint32 `json:"id"`
Owner string `json:"owner,omitempty"`
PublicKey string `json:"public_key"`
RealAddr string `json:"real_addr,omitempty"`
Networks []uint16 `json:"networks"`
Public bool `json:"public,omitempty"`
LastSeen string `json:"last_seen,omitempty"`
Hostname string `json:"hostname,omitempty"`
Tags []string `json:"tags,omitempty"`
TaskExec bool `json:"task_exec,omitempty"`
LANAddrs []string `json:"lan_addrs,omitempty"`
KeyCreated string `json:"key_created,omitempty"`
KeyRotated string `json:"key_rotated,omitempty"`
KeyRotCount int `json:"key_rot_count,omitempty"`
KeyExpires string `json:"key_expires,omitempty"`
ExternalID string `json:"external_id,omitempty"`
Version string `json:"version,omitempty"`
RelayOnly bool `json:"relay_only,omitempty"` // preserve flag across snapshots
}
SnapshotNode is the JSON-serializable form of a single registry node. All binary/time fields are encoded as strings (base64 / RFC3339) so the snapshot is human-readable and forward-compatible.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store manages the WAL file, save channels, standby flag, and replication token on behalf of the registry Server. It runs saveLoop and replicaPushLoop as background goroutines started by Start.
func NewStore ¶
func NewStore(walPath string, done <-chan struct{}, cbs StoreCallbacks) (*Store, error)
NewStore creates a Store. The WAL is opened at walPath (empty = no WAL). done is the server shutdown channel; callbacks wire the server-specific persistence and replication operations.
func (*Store) Close ¶
Close closes the underlying WAL file. The caller must wait for saveLoop and replicaPushLoop to exit (via SaveDone / ReplicaPushDone) before calling Close.
func (*Store) ReplicaPushCh ¶
func (st *Store) ReplicaPushCh() chan struct{}
ReplicaPushCh returns the replica-push signal channel.
func (*Store) ReplicaPushDone ¶
func (st *Store) ReplicaPushDone() <-chan struct{}
ReplicaPushDone returns the channel closed when replicaPushLoop exits.
func (*Store) ReplicationToken ¶
ReplicationToken returns the current replication token.
func (*Store) SaveCh ¶
func (st *Store) SaveCh() chan struct{}
SaveCh returns the save signal channel (write to trigger a debounced save).
func (*Store) SaveDone ¶
func (st *Store) SaveDone() <-chan struct{}
SaveDone returns the channel closed when saveLoop exits.
func (*Store) SetReplicationToken ¶
SetReplicationToken sets the token required for subscribe_replication.
func (*Store) SetStandby ¶
SetStandby configures standby mode. In standby mode the server rejects writes and instead receives snapshots from a primary.
func (*Store) Start ¶
func (st *Store) Start()
Start launches saveLoop and replicaPushLoop as goroutines. It must be called exactly once after NewStore.
func (*Store) TriggerSave ¶
func (st *Store) TriggerSave()
TriggerSave signals both the save loop and replica-push loop.
func (*Store) TriggerSnapshot ¶
TriggerSnapshot immediately runs a full snapshot flush. Returns nil if no storePath is configured (i.e. WAL is nil).
type StoreCallbacks ¶
type StoreCallbacks struct {
// FlushSave persists the current server state to disk. Called by saveLoop.
FlushSave func() error
// SnapshotJSON builds the current state as JSON bytes for replication.
// Returns nil on error (push is skipped).
SnapshotJSON func() []byte
// Push sends snapshot bytes to all replication subscribers.
Push func([]byte)
// IncSaveFailures increments the save-failure metric counter.
IncSaveFailures func()
}
StoreCallbacks bundles the server-side callbacks that Store needs without creating a circular import between the wal and server packages.
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL implements an append-only write-ahead log for registry mutations. Instead of serializing the entire state on every mutation (O(N) per save), the WAL appends only the delta entry (O(1) per mutation). Full snapshots are written periodically (compaction) and the WAL is truncated.
On-disk format: sequential records of [4-byte little-endian length][delta entry JSON]. The WAL file path is derived from the snapshot path: "{storePath}.wal".
func NewWAL ¶
NewWAL opens or creates a WAL file at the given path. Returns nil if path is empty (no persistence configured).
func (*WAL) Append ¶
func (w *WAL) Append(entry DeltaEntry) error
Append writes a delta entry to the WAL. The entry is fsync'd to ensure durability. Returns an error if the write fails.
func (*WAL) Replay ¶
func (w *WAL) Replay(fn func(DeltaEntry) error) (int, error)
Replay reads all entries from the WAL and calls fn for each. Used during startup to replay mutations that occurred after the last snapshot.