serializer

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package serializer provides efficient serialization and deserialization of FlashFS data structures.

This package implements serialization using FlatBuffers, a memory-efficient serialization library that allows for zero-copy deserialization. The package provides both standard and streaming serialization options for snapshots and diffs.

Basic Usage

The basic serialization functions are:

// Serialize a snapshot
data, err := serializer.SerializeSnapshotFB(entries, nil)

// Deserialize a snapshot
entries, err := serializer.DeserializeSnapshotFB(data)

// Serialize a diff
data, err := serializer.SerializeDiffFB(diffEntries, nil)

// Deserialize a diff
diffEntries, err := serializer.DeserializeDiffFB(data)

Memory Optimization

To optimize memory usage, you can reuse a FlatBuffers builder:

// Create a builder once
builder := flatbuffers.NewBuilder(0)

// Reuse it for multiple serializations
for _, snapshot := range snapshots {
    data, err := serializer.SerializeSnapshotFB(snapshot, builder)
    // Process data...
}

The serializer also provides a builder pool that automatically manages builders when you pass nil as the builder parameter:

// This will get a builder from the pool and return it when done
data, err := serializer.SerializeSnapshotFB(entries, nil)

Streaming Serialization

For very large snapshots or diffs, the package provides streaming serialization:

// Create a streaming serializer
serializer := serializer.NewStreamingSerializer(serializer.DefaultStreamingOptions())

// Serialize to a writer (file, network, etc.)
err := serializer.SerializeToWriter(entries, writer)

// Deserialize from a reader with a callback for each chunk
err := serializer.DeserializeFromReader(reader, func(chunk SnapshotChunk) error {
    // Process chunk.Entries
    // Track progress with chunk.Index and chunk.Total
    return nil
})

Streaming serialization breaks the data into chunks, which is useful for: - Processing very large snapshots that don't fit in memory - Showing progress during serialization/deserialization - Parallel processing of chunks - Network streaming

Performance Considerations

1. **Builder Reuse**: Always reuse builders for multiple serializations to reduce allocations.

  1. **Chunk Size**: For streaming serialization, adjust the chunk size based on your memory constraints and processing needs. Larger chunks are more efficient but use more memory.

  2. **Buffer Size**: The buffer size affects I/O performance. Larger buffers generally improve performance but use more memory.

  3. **Parallel Processing**: For large datasets, consider processing chunks in parallel during deserialization.

Examples

## Basic Serialization with Builder Reuse

// Create a builder to reuse
builder := flatbuffers.NewBuilder(0)

// Serialize multiple snapshots
for _, entries := range snapshots {
    data, err := serializer.SerializeSnapshotFB(entries, builder)
    if err != nil {
        return err
    }
    // Store or process data...
}

## Streaming Serialization to a File

// Open a file for writing
file, err := os.Create("snapshot.dat")
if err != nil {
    return err
}
defer file.Close()

// Create a streaming serializer
options := serializer.StreamingOptions{
    ChunkSize:  5000,    // 5000 entries per chunk
    BufferSize: 128*1024, // 128KB buffer
}
serializer := serializer.NewStreamingSerializer(options)

// Serialize the snapshot
err = serializer.SerializeToWriter(entries, file)
if err != nil {
    return err
}

## Streaming Deserialization with Progress Reporting

// Open a file for reading
file, err := os.Open("snapshot.dat")
if err != nil {
    return err
}
defer file.Close()

// Track progress
var totalEntries int
var processedChunks int

// Deserialize with a callback
err = serializer.DeserializeFromReader(file, func(chunk SnapshotChunk) error {
    // Process entries
    processEntries(chunk.Entries)

    // Update progress
    totalEntries += len(chunk.Entries)
    processedChunks++

    // Report progress
    progress := float64(processedChunks) / float64(chunk.Total) * 100
    fmt.Printf("Progress: %.2f%% (%d/%d chunks, %d entries)\n",
        progress, processedChunks, chunk.Total, totalEntries)

    return nil
})
if err != nil {
    return err
}

## Parallel Processing of Chunks

// Open a file for reading
file, err := os.Open("snapshot.dat")
if err != nil {
    return err
}
defer file.Close()

// Create a worker pool
var wg sync.WaitGroup
chunkChan := make(chan SnapshotChunk, 10)
errChan := make(chan error, 1)

// Start worker goroutines
for i := 0; i < runtime.NumCPU(); i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for chunk := range chunkChan {
            // Process chunk in parallel
            if err := processChunk(chunk); err != nil {
                select {
                case errChan <- err:
                default:
                }
                return
            }
        }
    }()
}

// Deserialize and send chunks to workers
go func() {
    err := serializer.DeserializeFromReader(file, func(chunk SnapshotChunk) error {
        select {
        case err := <-errChan:
            return err
        default:
            chunkChan <- chunk
            return nil
        }
    })

    close(chunkChan)
    if err != nil {
        select {
        case errChan <- err:
        default:
        }
    }
}()

// Wait for all workers to finish
wg.Wait()

// Check for errors
select {
case err := <-errChan:
    return err
default:
    return nil
}

Index

Constants

View Source
const (
	// DefaultChunkSize is the default size of chunks for streaming serialization
	DefaultChunkSize = 1000
)

Variables

This section is empty.

Functions

func DeserializeDiffFromReader added in v0.2.0

func DeserializeDiffFromReader(reader io.Reader, callback func(DiffChunk) error) error

DeserializeDiffFromReader deserializes a diff from a reader in chunks

func DeserializeFromReader added in v0.2.0

func DeserializeFromReader(reader io.Reader, callback func(SnapshotChunk) error) error

DeserializeFromReader deserializes a snapshot from a reader in chunks

func DeserializeSnapshotFB added in v0.2.0

func DeserializeSnapshotFB(data []byte) ([]walker.SnapshotEntry, error)

DeserializeSnapshotFB deserializes a snapshot using FlatBuffers

func SerializeDiffFB added in v0.2.0

func SerializeDiffFB(entries []DiffEntry, builder *flatbuffers.Builder) ([]byte, error)

SerializeDiffFB serializes a diff using FlatBuffers If a builder is provided, it will be reused; otherwise, a new one will be created

func SerializeSnapshot

func SerializeSnapshot(entries []walker.SnapshotEntry) ([]byte, error)

SerializeSnapshot serializes snapshot entries to FlatBuffers.

func SerializeSnapshotFB added in v0.2.0

func SerializeSnapshotFB(entries []walker.SnapshotEntry, builder *flatbuffers.Builder) ([]byte, error)

SerializeSnapshotFB serializes a snapshot using FlatBuffers If a builder is provided, it will be reused; otherwise, a new one will be created

Types

type DiffChunk added in v0.2.0

type DiffChunk struct {
	Entries []DiffEntry
	Index   int
	Total   int
}

DiffChunk represents a chunk of a diff

type DiffEntry added in v0.2.0

type DiffEntry struct {
	Path           string
	Type           DiffType
	OldSize        int64
	NewSize        int64
	OldModTime     time.Time
	NewModTime     time.Time
	OldPermissions uint32
	NewPermissions uint32
	OldHash        string
	NewHash        string
	IsDir          bool // Default to false since we can't access it from the generated code
}

DiffEntry represents a difference between two snapshots

func DeserializeDiffFB added in v0.2.0

func DeserializeDiffFB(data []byte) ([]DiffEntry, error)

DeserializeDiffFB deserializes a diff using FlatBuffers

type DiffType added in v0.2.0

type DiffType int8

DiffType represents the type of change in a diff entry

const (
	// DiffAdded represents a file that was added
	DiffAdded DiffType = iota
	// DiffModified represents a file that was modified
	DiffModified
	// DiffDeleted represents a file that was deleted
	DiffDeleted
)

type SnapshotChunk added in v0.2.0

type SnapshotChunk struct {
	Entries []walker.SnapshotEntry
	Index   int
	Total   int
}

SnapshotChunk represents a chunk of a snapshot

type StreamingDiffSerializer added in v0.2.0

type StreamingDiffSerializer struct {
	// contains filtered or unexported fields
}

StreamingDiffSerializer handles streaming serialization of diffs

func NewStreamingDiffSerializer added in v0.2.0

func NewStreamingDiffSerializer(options StreamingOptions) *StreamingDiffSerializer

NewStreamingDiffSerializer creates a new streaming diff serializer

func (*StreamingDiffSerializer) SerializeDiffToWriter added in v0.2.0

func (s *StreamingDiffSerializer) SerializeDiffToWriter(entries []DiffEntry, writer io.Writer) error

SerializeDiffToWriter serializes a diff to a writer in chunks

type StreamingOptions added in v0.2.0

type StreamingOptions struct {
	// ChunkSize is the number of entries per chunk
	ChunkSize int
	// BufferSize is the size of the buffer used for reading/writing
	BufferSize int
}

StreamingOptions contains options for streaming serialization/deserialization

func DefaultStreamingOptions added in v0.2.0

func DefaultStreamingOptions() StreamingOptions

DefaultStreamingOptions returns the default options for streaming

type StreamingSerializer added in v0.2.0

type StreamingSerializer struct {
	// contains filtered or unexported fields
}

StreamingSerializer handles streaming serialization of snapshots

func NewStreamingSerializer added in v0.2.0

func NewStreamingSerializer(options StreamingOptions) *StreamingSerializer

NewStreamingSerializer creates a new streaming serializer

func (*StreamingSerializer) SerializeToWriter added in v0.2.0

func (s *StreamingSerializer) SerializeToWriter(entries []walker.SnapshotEntry, writer io.Writer) error

SerializeToWriter serializes a snapshot to a writer in chunks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL