storage

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package storage provides a filesystem based storage layer for tony system api.

Storage manages

  • mappings of virtual document paths to and from the filesyste

  • indexing of virtual document nodes

  • storage of diffs associated with virtual document nodes

  • transactions of multi-participant diffs

  • compaction of diffs

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExampleStreamingIndexer added in v0.0.10

func ExampleStreamingIndexer() error

Example usage function

Types

type PatchAtSegment added in v0.0.10

type PatchAtSegment struct {
	Segment *index.LogSegment
	Patch   *ir.Node // Extracted patch at segment.KindedPath
	Parent  *ir.Node // Reconstructed parent patch (if segment.KindedPath is deep)
}

PatchAtSegment represents a patch extracted from a log segment.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides filesystem-based storage for logd.

func Open

func Open(root string, umask int, logger *slog.Logger) (*Storage, error)

Open opens or creates a Storage instance with the given root directory. The root directory will be created if it doesn't exist. umask is applied to directory permissions (e.g., 022 for 0755 -> 0755). If logger is nil, slog.Default() will be used. compactorConfig is optional - if nil, a default config with divisor 2 and NeverRemove is used.

func (*Storage) Close added in v0.0.10

func (s *Storage) Close() error

func (*Storage) GetCurrentCommit added in v0.0.10

func (s *Storage) GetCurrentCommit() (int64, error)

GetCurrentCommit returns the current commit number. This is a snapshot - if commits happen after this call, they won't be reflected.

func (*Storage) GetTx added in v0.0.10

func (s *Storage) GetTx(txID int64) (tx.Tx, error)

GetTx gets an existing transaction by transaction ID. This is the primary way participants coordinate - they all receive the same transaction ID and get the same transaction.

Example:

// Multiple parallel HTTP handlers all receive the same txID
tx, err := storage.GetTx(txID)
if err != nil {
    // handle error
}

// Each participant gets their own patcher handle
patcher := tx.NewPatcher(kp, m, p)
result := patcher.Commit()

func (*Storage) NewTx added in v0.0.10

func (s *Storage) NewTx(participantCount int, meta *api.PatchMeta) (tx.Tx, error)

NewTx creates a new transaction with the specified number of participants. Returns a transaction that participants can get via GetTx or get a patcher via NewPatcher().

Example usage (typical pattern for parallel HTTP handlers):

// Create transaction
tx, err := storage.NewTx(participantCount, meta)
if err != nil {
    // handle error
}

// Each participant gets their own patcher handle
patcher := tx.NewPatcher(kp, m, p)
result := patcher.WaitForCompletion()

func (*Storage) ReadCurrentState added in v0.0.10

func (s *Storage) ReadCurrentState(kPath string) (*ir.Node, error)

ReadCurrentState reads the current committed state for a given virtual path. This is equivalent to calling GetCurrentCommit() then ReadStateAt() with that commit. If commits happen between getting the commit and reading, they are ignored (point-in-time read).

func (*Storage) ReadPatchesAt added in v0.0.10

func (s *Storage) ReadPatchesAt(kpath string, commit int64) ([]PatchAtSegment, error)

ReadPatchesAt reads all patches affecting the given kpath at the given commit. Returns patches extracted from log entries, including sub-trees and reconstructed parents. This is a testing/development helper - it doesn't apply or merge patches.

func (*Storage) ReadStateAt added in v0.0.10

func (s *Storage) ReadStateAt(kPath string, commit int64) (*ir.Node, error)

ReadStateAt reads the state for a given kpath at a specific commit count. It uses compaction and caching when available to minimize the number of patches applied. If commitCount is 0, reads the latest state (all segments).

type StreamingIndexer added in v0.0.10

type StreamingIndexer struct {
	// contains filtered or unexported fields
}

StreamingIndexer reads a document from disk using streaming tokenization, populates the index.Index structure, and writes tokens to another file. It never stores the entire document in memory, only maintaining the index.

func NewStreamingIndexer added in v0.0.10

func NewStreamingIndexer(sourcePath, destPath string, commit, tx int64) (*StreamingIndexer, error)

NewStreamingIndexer creates a new streaming indexer. It opens the source file for reading and creates/opens the dest file for writing.

func (*StreamingIndexer) Close added in v0.0.10

func (si *StreamingIndexer) Close() error

Close closes the source and destination files.

func (*StreamingIndexer) GetIndex added in v0.0.10

func (si *StreamingIndexer) GetIndex() *index.Index

GetIndex returns the built index.

func (*StreamingIndexer) Process added in v0.0.10

func (si *StreamingIndexer) Process() error

Process streams tokens from source to destination, building the index as it goes. It never loads the entire document into memory - only processes tokens incrementally. The TokenSink callback automatically updates the index when nodes start.

func (*StreamingIndexer) ReadPath added in v0.0.10

func (si *StreamingIndexer) ReadPath(kpath string) (*ir.Node, error)

ReadPath reads a specific path from the destination file using the index. It looks up the path in the index, reads from the LogPosition offset, parses the node, and extracts the specific path.

The implementation reads directly from the file starting at the offset, allowing parsing to read as much as necessary to find a complete node, even if it exceeds any initial size estimate. This ensures we can parse nodes of any size without artificial limits.

Both bracketed structures ({...} or [...]) and simple values (strings, numbers, booleans, null) are supported. The implementation detects the token type and uses the appropriate parsing method: - Simple values: parsed directly from tokens - Bracketed structures: parsed using NodeParser

The kpath parameter should be in kpath format (e.g., "a.b.c", "[0].key", "foo{4}.bar")

func (*StreamingIndexer) ReadRandomPaths added in v0.0.10

func (si *StreamingIndexer) ReadRandomPaths(paths []string) (map[string]*ir.Node, error)

ReadRandomPaths reads multiple random paths from the index and extracts their nodes. It demonstrates using the index to find and read paths.

Directories

Path Synopsis
internal
seq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL