checkpoint

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package checkpoint runs a background goroutine that periodically folds the WAL tail into a fresh snapshot and truncates the WAL. Without this the WAL would grow unbounded during steady-state operation.

The checkpointer is non-blocking for writers: it takes the snapshot under the same store mutex used by store/txn.Tx, holds it only for the brief moment needed to swap files, then releases it for the next transaction.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpointer

type Checkpointer[N comparable, W any] struct {
	// contains filtered or unexported fields
}

Checkpointer holds the goroutine state.

Concurrency: Start, Stop, Trigger, and Stats are safe to call from any number of goroutines. Stop is idempotent (safe to call any number of times serially or concurrently).

Example

ExampleCheckpointer commits a transaction, folds the WAL tail into a self-sufficient on-disk snapshot with one forced checkpoint, then restores the whole graph from that snapshot alone. Because the checkpoint truncates the WAL after writing the snapshot, recovery reports zero replayed WAL ops: every byte of state came from the snapshot.

package main

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"sync"

	"github.com/FlavioCFOliveira/GoGraph/graph/adjlist"
	"github.com/FlavioCFOliveira/GoGraph/graph/lpg"
	"github.com/FlavioCFOliveira/GoGraph/store/checkpoint"
	"github.com/FlavioCFOliveira/GoGraph/store/recovery"
	"github.com/FlavioCFOliveira/GoGraph/store/txn"
	"github.com/FlavioCFOliveira/GoGraph/store/wal"
)

func main() {
	dir, err := os.MkdirTemp("", "checkpoint-example")
	if err != nil {
		panic(err)
	}
	defer func() { _ = os.RemoveAll(dir) }()

	w, err := wal.Open(filepath.Join(dir, "wal"))
	if err != nil {
		panic(err)
	}

	g := lpg.New[string, int64](adjlist.Config{Directed: true})
	s := txn.NewStoreWithOptions[string, int64](g, w, txn.Options[string, int64]{
		Codec:       txn.NewStringCodec(),
		WeightCodec: txn.NewInt64WeightCodec(),
	})

	tx := s.Begin()
	if err := tx.AddEdge("alice", "bob", 7); err != nil {
		panic(err)
	}
	if err := tx.SetNodeLabel("alice", "Person"); err != nil {
		panic(err)
	}
	if err := tx.Commit(); err != nil {
		panic(err)
	}

	// The checkpointer shares the store's mutex so it can take a
	// consistent snapshot. Trigger forces one synchronous checkpoint;
	// Stop tears the goroutine down (no leaks).
	var mu sync.Mutex
	cp := checkpoint.New[string, int64](checkpoint.Config{Dir: dir}, g, w, &mu)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	cp.Start(ctx)
	if err := cp.Trigger(); err != nil {
		panic(err)
	}
	fmt.Printf("checkpoints=%d\n", cp.Stats().Checkpoints)
	cp.Stop()
	if err := w.Close(); err != nil {
		panic(err)
	}

	// Restore from the directory: the snapshot is self-sufficient, so
	// WALOps is zero and the committed state is fully present.
	res, err := recovery.Open[string, int64](dir, recovery.Options[string, int64]{
		Codec:       txn.NewStringCodec(),
		WeightCodec: txn.NewInt64WeightCodec(),
	})
	if err != nil {
		panic(err)
	}
	rg := res.Graph
	fmt.Printf("snapshot hit=%t wal ops=%d\n", res.SnapshotHit, res.WALOps)
	fmt.Printf("edge=%t label=%t\n",
		rg.AdjList().HasEdge("alice", "bob"),
		rg.HasNodeLabel("alice", "Person"))

}
Output:
checkpoints=1
snapshot hit=true wal ops=0
edge=true label=true

func New

func New[N comparable, W any](
	cfg Config,
	g *lpg.Graph[N, W],
	wlog *wal.Writer,
	storeMu *sync.Mutex,
	opts ...Option[N, W],
) *Checkpointer[N, W]

New returns a Checkpointer; call Start to launch the goroutine.

storeMu must be the same mutex the transaction layer holds during commit; the checkpointer acquires it briefly to take a consistent snapshot of the graph state.

Pass WithMapperCodec to make non-string-keyed snapshots self-sufficient so the WAL can be truncated for every key type; see that option's documentation for the durability rationale.

func (*Checkpointer[N, W]) Start

func (c *Checkpointer[N, W]) Start(ctx context.Context)

Start launches the background goroutine. ctx cancellation stops the goroutine. The goroutine is tagged with pprof labels (goroutine=checkpoint-loop, dir=<cfg.Dir>) so it appears named in pprof goroutine profiles rather than as anonymous "go c.loop".

func (*Checkpointer[N, W]) Stats

func (c *Checkpointer[N, W]) Stats() Stats

Stats returns the current lifetime counters.

func (*Checkpointer[N, W]) Stop

func (c *Checkpointer[N, W]) Stop()

Stop signals the goroutine to exit and blocks until it does. Stop is idempotent: subsequent calls are no-ops once the goroutine has exited.

func (*Checkpointer[N, W]) Trigger

func (c *Checkpointer[N, W]) Trigger() error

Trigger requests a checkpoint and blocks until it completes, returning its error. Equivalent to TriggerCtx(context.Background()).

On a saturated trigger buffer (rare, but possible if many Trigger calls race ahead of the loop) Trigger can block indefinitely; prefer TriggerCtx in production code to bound the latency.

func (*Checkpointer[N, W]) TriggerCtx

func (c *Checkpointer[N, W]) TriggerCtx(ctx context.Context) error

TriggerCtx requests a checkpoint and blocks until it completes, honouring ctx cancellation on every wait edge: queue-submit, in-flight, and stop-signal. Returns ctx.Err() wrapped on cancellation or deadline expiry.

type Config

type Config struct {
	// Dir is the snapshot directory and the location of the WAL.
	Dir string
	// MaxAge fires a checkpoint when more than this duration has
	// elapsed since the previous one. Zero disables age-based
	// triggering.
	MaxAge time.Duration
	// Interval is the polling interval; if MaxAge fires before the
	// next tick the checkpointer still waits Interval to re-poll.
	// Defaults to MaxAge/4 when zero.
	Interval time.Duration
}

Config controls when the checkpointer fires.

type Option

type Option[N comparable, W any] func(*Checkpointer[N, W])

Option customises a Checkpointer at construction. Options are applied in order by New.

func WithMapperCodec

func WithMapperCodec[N comparable, W any](codec txn.Codec[N]) Option[N, W]

WithMapperCodec supplies the node-identifier codec the checkpointer uses to persist the NodeID->key interning table (mapper.bin) for ANY key type N. Pass the owning store's codec (txn.Store.Codec).

Without this option the checkpointer persists the mapper only for string-keyed graphs; non-string snapshots are then not self-sufficient and the WAL is retained (never truncated) to avoid data loss, at the cost of unbounded WAL growth. With this option the snapshot is self-sufficient for every key type, so the checkpointer can truncate the WAL after each successful checkpoint and keep the log bounded (audit gap F3).

A nil codec is ignored (the checkpointer keeps the string-only fallback).

type Stats

type Stats struct {
	Checkpoints    uint64
	WALTruncBytes  uint64
	LastDurationNS uint64
	LastError      string
}

Stats is a monotonic snapshot of the checkpointer's lifetime counters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL