Documentation
¶
Overview ¶
Package checkpoint provides state persistence for streaming analysis.
Index ¶
- Constants
- Variables
- func DefaultDir() string
- func LoadState(dir, basename string, codec Codec, state any) error
- func RepoHash(repoPath string) string
- func SaveState(dir, basename string, codec Codec, state any) error
- type AggregatorSpillEntry
- type Checkpointable
- type Codec
- type GobCodec
- type JSONCodec
- type Manager
- func (m *Manager) CheckpointDir() string
- func (m *Manager) Clear() error
- func (m *Manager) Exists() bool
- func (m *Manager) Load(checkpointables []Checkpointable) (*StreamingState, error)
- func (m *Manager) LoadMetadata() (*Metadata, error)
- func (m *Manager) MetadataPath() string
- func (m *Manager) Save(checkpointables []Checkpointable, state StreamingState, repoPath string, ...) error
- func (m *Manager) Validate(repoPath string, analyzerNames []string) error
- type Metadata
- type Persister
- type StreamingState
Constants ¶
const ( DefaultMaxAge = 7 * 24 * time.Hour // 7 days. DefaultMaxSize = 1 << 30 // 1GB. )
Default retention values.
const MetadataVersion = 2
MetadataVersion is the current checkpoint metadata format version. Bumped from 1 to 2 when aggregator spill state was added.
Variables ¶
var ( ErrRepoPathMismatch = errors.New("repo path mismatch") ErrAnalyzerMismatch = errors.New("analyzer mismatch") ErrVersionMismatch = errors.New("checkpoint version mismatch") )
Sentinel errors for checkpoint validation.
Functions ¶
func DefaultDir ¶
func DefaultDir() string
DefaultDir returns the default checkpoint directory (~/.codefang/checkpoints).
func LoadState ¶
LoadState loads state from a file in the specified directory. The filename is constructed from the basename and the codec's extension. The state parameter must be a pointer to the target struct.
Types ¶
type AggregatorSpillEntry ¶
type AggregatorSpillEntry struct {
// Dir is the directory containing gob-encoded spill files.
Dir string `json:"dir,omitempty"`
// Count is the number of spill files in Dir.
Count int `json:"count,omitempty"`
}
AggregatorSpillEntry records on-disk spill state for a single aggregator.
type Checkpointable ¶
type Checkpointable interface {
// SaveCheckpoint writes analyzer state to the given directory.
SaveCheckpoint(dir string) error
// LoadCheckpoint restores analyzer state from the given directory.
LoadCheckpoint(dir string) error
// CheckpointSize returns the estimated size of the checkpoint in bytes.
CheckpointSize() int64
}
Checkpointable is an optional interface for analyzers that support checkpointing.
type Codec ¶
type Codec interface {
// Encode writes the state to the writer.
Encode(w io.Writer, state any) error
// Decode reads the state from the reader.
Decode(r io.Reader, state any) error
// Extension returns the file extension for this codec (e.g., ".json", ".gob").
Extension() string
}
Codec defines how checkpoint state is serialized and deserialized.
type GobCodec ¶
type GobCodec struct{}
GobCodec implements Codec using gob encoding.
type JSONCodec ¶
type JSONCodec struct {
// Indent specifies the indentation string. Empty string means compact JSON.
Indent string
}
JSONCodec implements Codec using JSON encoding with indentation.
func NewCompactJSONCodec ¶
func NewCompactJSONCodec() *JSONCodec
NewCompactJSONCodec creates a JSON codec without indentation.
func NewJSONCodec ¶
func NewJSONCodec() *JSONCodec
NewJSONCodec creates a JSON codec with pretty-printing.
type Manager ¶
Manager coordinates checkpoints across analyzers.
func NewManager ¶
NewManager creates a new checkpoint manager.
func (*Manager) CheckpointDir ¶
CheckpointDir returns the directory for this repository's checkpoint.
func (*Manager) Load ¶
func (m *Manager) Load(checkpointables []Checkpointable) (*StreamingState, error)
Load restores state for all checkpointable analyzers.
func (*Manager) LoadMetadata ¶
LoadMetadata loads the checkpoint metadata.
func (*Manager) MetadataPath ¶
MetadataPath returns the path to the metadata file.
func (*Manager) Save ¶
func (m *Manager) Save( checkpointables []Checkpointable, state StreamingState, repoPath string, analyzerNames []string, ) error
Save creates a checkpoint for all checkpointable analyzers.
type Metadata ¶
type Metadata struct {
Version int `json:"version"`
RepoPath string `json:"repo_path"`
RepoHash string `json:"repo_hash"`
CreatedAt string `json:"created_at"`
Analyzers []string `json:"analyzers"`
StreamingState StreamingState `json:"streaming_state"`
Checksums map[string]string `json:"checksums"`
}
Metadata holds checkpoint metadata for validation and resume.
type Persister ¶
type Persister[T any] struct { // contains filtered or unexported fields }
Persister handles checkpoint I/O for a specific state type.
func NewPersister ¶
NewPersister creates a checkpoint persister with the given basename and codec.
type StreamingState ¶
type StreamingState struct {
TotalCommits int `json:"total_commits"`
ProcessedCommits int `json:"processed_commits"`
CurrentChunk int `json:"current_chunk"`
TotalChunks int `json:"total_chunks"`
LastCommitHash string `json:"last_commit_hash"`
LastTick int `json:"last_tick"`
// AggregatorSpills records the spill state of each aggregator at checkpoint time.
// Indexed by analyzer position in the Runner.Analyzers slice.
// Nil entries mean the analyzer has no aggregator (plumbing, file_history).
AggregatorSpills []AggregatorSpillEntry `json:"aggregator_spills,omitempty"`
}
StreamingState tracks chunk orchestrator progress.