Documentation
¶
Overview ¶
Package checkpoint runs a background goroutine that periodically folds the WAL tail into a fresh snapshot and truncates the WAL. Without this the WAL would grow unbounded during steady-state operation.
The checkpointer takes the snapshot and truncates the WAL under the store's commit serialisation, so no transaction can be mid-apply or mid-commit during that window: the snapshot is a consistent transaction-boundary image and the truncation never drops a frame committed after the snapshot. Wire it with WithCommitSerialiser (txn.Store.RunUnderCommitLock) when the store is driven by the Cypher engine, whose commit mutex is private and is therefore NOT the storeMu an external checkpointer is constructed with (see docs/acid-audit.md F3.5). The serialisation is held for the whole snapshot write, so the checkpointer blocks writers for the duration of the disk I/O; a non-blocking, watermark-bounded truncate is the documented future optimisation (docs/isolation-design.md, "Checkpoint and recovery").
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrCheckpointerStopped = errors.New("checkpoint: checkpointer stopped")
ErrCheckpointerStopped is returned by Trigger and TriggerCtx when the checkpointer has stopped: its background loop has exited because Stop was called or because the context passed to Start was cancelled. It is a clean terminal signal, not a failure — a checkpoint cannot run once the loop is gone, so the call returns promptly with this sentinel instead of blocking forever on a result that will never arrive.
Functions ¶
This section is empty.
Types ¶
type Checkpointer ¶
type Checkpointer[N comparable, W any] struct { // contains filtered or unexported fields }
Checkpointer holds the goroutine state.
Concurrency: Start, Stop, Trigger, and Stats are safe to call from any number of goroutines. Stop is idempotent (safe to call any number of times serially or concurrently).
Example ¶
ExampleCheckpointer commits a transaction, folds the WAL tail into a self-sufficient on-disk snapshot with one forced checkpoint, then restores the whole graph from that snapshot alone. Because the checkpoint truncates the WAL after writing the snapshot, recovery reports zero replayed WAL ops: every byte of state came from the snapshot.
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/FlavioCFOliveira/GoGraph/graph/adjlist"
"github.com/FlavioCFOliveira/GoGraph/graph/lpg"
"github.com/FlavioCFOliveira/GoGraph/store/checkpoint"
"github.com/FlavioCFOliveira/GoGraph/store/recovery"
"github.com/FlavioCFOliveira/GoGraph/store/txn"
"github.com/FlavioCFOliveira/GoGraph/store/wal"
)
func main() {
dir, err := os.MkdirTemp("", "checkpoint-example")
if err != nil {
panic(err)
}
defer func() { _ = os.RemoveAll(dir) }()
w, err := wal.Open(filepath.Join(dir, "wal"))
if err != nil {
panic(err)
}
g := lpg.New[string, int64](adjlist.Config{Directed: true})
s := txn.NewStoreWithOptions[string, int64](g, w, txn.Options[string, int64]{
Codec: txn.NewStringCodec(),
WeightCodec: txn.NewInt64WeightCodec(),
})
tx := s.Begin()
if err := tx.AddEdge("alice", "bob", 7); err != nil {
panic(err)
}
if err := tx.SetNodeLabel("alice", "Person"); err != nil {
panic(err)
}
if err := tx.Commit(); err != nil {
panic(err)
}
// The checkpointer shares the store's mutex so it can take a
// consistent snapshot. Trigger forces one synchronous checkpoint;
// Stop tears the goroutine down (no leaks).
var mu sync.Mutex
cp := checkpoint.New[string, int64](checkpoint.Config{Dir: dir}, g, w, &mu)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cp.Start(ctx)
if err := cp.Trigger(); err != nil {
panic(err)
}
fmt.Printf("checkpoints=%d\n", cp.Stats().Checkpoints)
cp.Stop()
if err := w.Close(); err != nil {
panic(err)
}
// Restore from the directory: the snapshot is self-sufficient, so
// WALOps is zero and the committed state is fully present.
res, err := recovery.Open[string, int64](dir, recovery.Options[string, int64]{
Codec: txn.NewStringCodec(),
WeightCodec: txn.NewInt64WeightCodec(),
})
if err != nil {
panic(err)
}
rg := res.Graph
fmt.Printf("snapshot hit=%t wal ops=%d\n", res.SnapshotHit, res.WALOps)
fmt.Printf("edge=%t label=%t\n",
rg.AdjList().HasEdge("alice", "bob"),
rg.HasNodeLabel("alice", "Person"))
}
Output: checkpoints=1 snapshot hit=true wal ops=0 edge=true label=true
func New ¶
func New[N comparable, W any]( cfg Config, g *lpg.Graph[N, W], wlog *wal.Writer, storeMu *sync.Mutex, opts ...Option[N, W], ) *Checkpointer[N, W]
New returns a Checkpointer; call Start to launch the goroutine.
storeMu is the serialisation the checkpointer holds across the snapshot+truncate window WHEN no WithCommitSerialiser is supplied. It is correct only when the caller performs every write while holding this same mutex. For a txn.Store driven by the Cypher engine the commit mutex is private and unreachable, so storeMu can never be that mutex; such callers MUST pass WithCommitSerialiser(txn.Store.RunUnderCommitLock), which supersedes storeMu and excludes the engine's eager write+commit window (docs/acid-audit.md F3.5). When a serialiser is supplied storeMu is unused and may be any mutex (a throwaway is fine).
Pass WithMapperCodec to make non-string-keyed snapshots self-sufficient so the WAL can be truncated for every key type; see that option's documentation for the durability rationale.
func (*Checkpointer[N, W]) Start ¶
func (c *Checkpointer[N, W]) Start(ctx context.Context)
Start launches the background goroutine. ctx cancellation stops the goroutine. The goroutine is tagged with pprof labels (goroutine=checkpoint-loop, dir=<cfg.Dir>) so it appears named in pprof goroutine profiles rather than as anonymous "go c.loop".
func (*Checkpointer[N, W]) Stats ¶
func (c *Checkpointer[N, W]) Stats() Stats
Stats returns the current lifetime counters.
func (*Checkpointer[N, W]) Stop ¶
func (c *Checkpointer[N, W]) Stop()
Stop signals the goroutine to exit and blocks until it does. Stop is idempotent: subsequent calls are no-ops once the goroutine has exited.
func (*Checkpointer[N, W]) Trigger ¶
func (c *Checkpointer[N, W]) Trigger() error
Trigger requests a checkpoint and blocks until it completes, returning its error. Equivalent to TriggerCtx(context.Background()).
Once the checkpointer has stopped (Stop called, or the Start context cancelled) Trigger returns ErrCheckpointerStopped promptly instead of blocking: a checkpoint can no longer run, and the loop that would answer the request is gone. While the loop is running, a saturated trigger buffer (rare, but possible if many Trigger calls race ahead of the loop) can still delay Trigger until the loop drains it; prefer TriggerCtx in production code to bound that latency with a deadline.
func (*Checkpointer[N, W]) TriggerCtx ¶
func (c *Checkpointer[N, W]) TriggerCtx(ctx context.Context) error
TriggerCtx requests a checkpoint and blocks until it completes, honouring ctx cancellation on every wait edge (queue-submit and result-wait) and returning ErrCheckpointerStopped promptly if the checkpointer has stopped. Returns ctx.Err() wrapped on cancellation or deadline expiry.
Three independent guards make a permanent block impossible:
- A non-blocking fast-path check of stoppedCh: once the loop has exited, the request is never buffered, so it cannot be orphaned.
- A stoppedCh arm on the buffered-submit select: a submit racing the loop's exit either lands in the buffer (then guard 3 applies) or observes the stop and returns the sentinel.
- A stoppedCh arm on the result-wait select: even a request buffered at the exact instant the loop departs is woken — the loop's teardown closes stoppedCh after it stops reading triggerCh, so a buffered request that the teardown's drain does not reach in time still completes here rather than waiting forever on a result the departed loop will never send.
type Config ¶
type Config struct {
// Dir is the snapshot directory and the location of the WAL.
Dir string
// MaxAge fires a checkpoint when more than this duration has
// elapsed since the previous one. Zero disables age-based
// triggering.
MaxAge time.Duration
// Interval is the polling interval; if MaxAge fires before the
// next tick the checkpointer still waits Interval to re-poll.
// Defaults to MaxAge/4 when zero.
Interval time.Duration
}
Config controls when the checkpointer fires.
type Option ¶
type Option[N comparable, W any] func(*Checkpointer[N, W])
Option customises a Checkpointer at construction. Options are applied in order by New.
func WithCommitSerialiser ¶ added in v0.2.0
func WithCommitSerialiser[N comparable, W any](serialise func(func() error) error) Option[N, W]
WithCommitSerialiser makes the checkpointer run its entire snapshot-capture + WAL-truncate critical section under serialise instead of locking the raw storeMu passed to New. Pass the owning store's txn.Store.RunUnderCommitLock:
cp := checkpoint.New(cfg, store.Graph(), wlog, &unusedMu, checkpoint.WithCommitSerialiser[string, float64](store.RunUnderCommitLock), checkpoint.WithMapperCodec[string, float64](store.Codec()))
Why this matters: the engine write path (cypher.Engine over a txn.Store) applies its in-memory mutations and appends its WAL frames while holding the store's PRIVATE commit mutex, from txn.Store.Begin until the transaction's commit in cypher Result.Close. That mutex is not the storeMu an external checkpointer is constructed with, so without this option the checkpointer never excludes the commit window: it can build the snapshot from a half-applied transaction, and it can truncate a WAL frame committed after the snapshot was taken — both violations of the ACID guarantees (docs/acid-audit.md F3.5). Running the critical section under RunUnderCommitLock closes both windows because no transaction can be between Begin and commit while serialise holds the commit mutex.
The snapshot is additionally taken inside lpg.Graph.View regardless of this option, so the captured adjacency is always barrier-consistent; the serialiser is what additionally makes the truncate safe and the snapshot transaction-boundary aligned for the engine wiring.
A nil serialiser is ignored (the checkpointer keeps locking storeMu).
func WithMapperCodec ¶
func WithMapperCodec[N comparable, W any](codec txn.Codec[N]) Option[N, W]
WithMapperCodec supplies the node-identifier codec the checkpointer uses to persist the NodeID->key interning table (mapper.bin) for ANY key type N. Pass the owning store's codec (txn.Store.Codec).
Without this option the checkpointer persists the mapper only for string-keyed graphs; non-string snapshots are then not self-sufficient and the WAL is retained (never truncated) to avoid data loss, at the cost of unbounded WAL growth. With this option the snapshot is self-sufficient for every key type, so the checkpointer can truncate the WAL after each successful checkpoint and keep the log bounded (audit gap F3).
A nil codec is ignored (the checkpointer keeps the string-only fallback).