Documentation
¶
Overview ¶
Package serializer provides efficient serialization and deserialization of FlashFS data structures.
This package implements serialization using FlatBuffers, a memory-efficient serialization library that allows for zero-copy deserialization. The package provides both standard and streaming serialization options for snapshots and diffs.
Basic Usage ¶
The basic serialization functions are:
// Serialize a snapshot data, err := serializer.SerializeSnapshotFB(entries, nil) // Deserialize a snapshot entries, err := serializer.DeserializeSnapshotFB(data) // Serialize a diff data, err := serializer.SerializeDiffFB(diffEntries, nil) // Deserialize a diff diffEntries, err := serializer.DeserializeDiffFB(data)
Memory Optimization ¶
To optimize memory usage, you can reuse a FlatBuffers builder:
// Create a builder once
builder := flatbuffers.NewBuilder(0)
// Reuse it for multiple serializations
for _, snapshot := range snapshots {
data, err := serializer.SerializeSnapshotFB(snapshot, builder)
// Process data...
}
The serializer also provides a builder pool that automatically manages builders when you pass nil as the builder parameter:
// This will get a builder from the pool and return it when done data, err := serializer.SerializeSnapshotFB(entries, nil)
Streaming Serialization ¶
For very large snapshots or diffs, the package provides streaming serialization:
// Create a streaming serializer
serializer := serializer.NewStreamingSerializer(serializer.DefaultStreamingOptions())
// Serialize to a writer (file, network, etc.)
err := serializer.SerializeToWriter(entries, writer)
// Deserialize from a reader with a callback for each chunk
err := serializer.DeserializeFromReader(reader, func(chunk SnapshotChunk) error {
// Process chunk.Entries
// Track progress with chunk.Index and chunk.Total
return nil
})
Streaming serialization breaks the data into chunks, which is useful for: - Processing very large snapshots that don't fit in memory - Showing progress during serialization/deserialization - Parallel processing of chunks - Network streaming
Performance Considerations ¶
1. **Builder Reuse**: Always reuse builders for multiple serializations to reduce allocations.
**Chunk Size**: For streaming serialization, adjust the chunk size based on your memory constraints and processing needs. Larger chunks are more efficient but use more memory.
**Buffer Size**: The buffer size affects I/O performance. Larger buffers generally improve performance but use more memory.
**Parallel Processing**: For large datasets, consider processing chunks in parallel during deserialization.
Examples ¶
## Basic Serialization with Builder Reuse
// Create a builder to reuse
builder := flatbuffers.NewBuilder(0)
// Serialize multiple snapshots
for _, entries := range snapshots {
data, err := serializer.SerializeSnapshotFB(entries, builder)
if err != nil {
return err
}
// Store or process data...
}
## Streaming Serialization to a File
// Open a file for writing
file, err := os.Create("snapshot.dat")
if err != nil {
return err
}
defer file.Close()
// Create a streaming serializer
options := serializer.StreamingOptions{
ChunkSize: 5000, // 5000 entries per chunk
BufferSize: 128*1024, // 128KB buffer
}
serializer := serializer.NewStreamingSerializer(options)
// Serialize the snapshot
err = serializer.SerializeToWriter(entries, file)
if err != nil {
return err
}
## Streaming Deserialization with Progress Reporting
// Open a file for reading
file, err := os.Open("snapshot.dat")
if err != nil {
return err
}
defer file.Close()
// Track progress
var totalEntries int
var processedChunks int
// Deserialize with a callback
err = serializer.DeserializeFromReader(file, func(chunk SnapshotChunk) error {
// Process entries
processEntries(chunk.Entries)
// Update progress
totalEntries += len(chunk.Entries)
processedChunks++
// Report progress
progress := float64(processedChunks) / float64(chunk.Total) * 100
fmt.Printf("Progress: %.2f%% (%d/%d chunks, %d entries)\n",
progress, processedChunks, chunk.Total, totalEntries)
return nil
})
if err != nil {
return err
}
## Parallel Processing of Chunks
// Open a file for reading
file, err := os.Open("snapshot.dat")
if err != nil {
return err
}
defer file.Close()
// Create a worker pool
var wg sync.WaitGroup
chunkChan := make(chan SnapshotChunk, 10)
errChan := make(chan error, 1)
// Start worker goroutines
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for chunk := range chunkChan {
// Process chunk in parallel
if err := processChunk(chunk); err != nil {
select {
case errChan <- err:
default:
}
return
}
}
}()
}
// Deserialize and send chunks to workers
go func() {
err := serializer.DeserializeFromReader(file, func(chunk SnapshotChunk) error {
select {
case err := <-errChan:
return err
default:
chunkChan <- chunk
return nil
}
})
close(chunkChan)
if err != nil {
select {
case errChan <- err:
default:
}
}
}()
// Wait for all workers to finish
wg.Wait()
// Check for errors
select {
case err := <-errChan:
return err
default:
return nil
}
Index ¶
- Constants
- func DeserializeDiffFromReader(reader io.Reader, callback func(DiffChunk) error) error
- func DeserializeFromReader(reader io.Reader, callback func(SnapshotChunk) error) error
- func DeserializeSnapshotFB(data []byte) ([]walker.SnapshotEntry, error)
- func SerializeDiffFB(entries []DiffEntry, builder *flatbuffers.Builder) ([]byte, error)
- func SerializeSnapshot(entries []walker.SnapshotEntry) ([]byte, error)
- func SerializeSnapshotFB(entries []walker.SnapshotEntry, builder *flatbuffers.Builder) ([]byte, error)
- type DiffChunk
- type DiffEntry
- type DiffType
- type SnapshotChunk
- type StreamingDiffSerializer
- type StreamingOptions
- type StreamingSerializer
Constants ¶
const (
// DefaultChunkSize is the default size of chunks for streaming serialization
DefaultChunkSize = 1000
)
Variables ¶
This section is empty.
Functions ¶
func DeserializeDiffFromReader ¶ added in v0.2.0
DeserializeDiffFromReader deserializes a diff from a reader in chunks
func DeserializeFromReader ¶ added in v0.2.0
func DeserializeFromReader(reader io.Reader, callback func(SnapshotChunk) error) error
DeserializeFromReader deserializes a snapshot from a reader in chunks
func DeserializeSnapshotFB ¶ added in v0.2.0
func DeserializeSnapshotFB(data []byte) ([]walker.SnapshotEntry, error)
DeserializeSnapshotFB deserializes a snapshot using FlatBuffers
func SerializeDiffFB ¶ added in v0.2.0
func SerializeDiffFB(entries []DiffEntry, builder *flatbuffers.Builder) ([]byte, error)
SerializeDiffFB serializes a diff using FlatBuffers If a builder is provided, it will be reused; otherwise, a new one will be created
func SerializeSnapshot ¶
func SerializeSnapshot(entries []walker.SnapshotEntry) ([]byte, error)
SerializeSnapshot serializes snapshot entries to FlatBuffers.
func SerializeSnapshotFB ¶ added in v0.2.0
func SerializeSnapshotFB(entries []walker.SnapshotEntry, builder *flatbuffers.Builder) ([]byte, error)
SerializeSnapshotFB serializes a snapshot using FlatBuffers If a builder is provided, it will be reused; otherwise, a new one will be created
Types ¶
type DiffEntry ¶ added in v0.2.0
type DiffEntry struct {
Path string
Type DiffType
OldSize int64
NewSize int64
OldModTime time.Time
NewModTime time.Time
OldPermissions uint32
NewPermissions uint32
OldHash string
NewHash string
IsDir bool // Default to false since we can't access it from the generated code
}
DiffEntry represents a difference between two snapshots
func DeserializeDiffFB ¶ added in v0.2.0
DeserializeDiffFB deserializes a diff using FlatBuffers
type DiffType ¶ added in v0.2.0
type DiffType int8
DiffType represents the type of change in a diff entry
type SnapshotChunk ¶ added in v0.2.0
type SnapshotChunk struct {
Entries []walker.SnapshotEntry
Index int
Total int
}
SnapshotChunk represents a chunk of a snapshot
type StreamingDiffSerializer ¶ added in v0.2.0
type StreamingDiffSerializer struct {
// contains filtered or unexported fields
}
StreamingDiffSerializer handles streaming serialization of diffs
func NewStreamingDiffSerializer ¶ added in v0.2.0
func NewStreamingDiffSerializer(options StreamingOptions) *StreamingDiffSerializer
NewStreamingDiffSerializer creates a new streaming diff serializer
func (*StreamingDiffSerializer) SerializeDiffToWriter ¶ added in v0.2.0
func (s *StreamingDiffSerializer) SerializeDiffToWriter(entries []DiffEntry, writer io.Writer) error
SerializeDiffToWriter serializes a diff to a writer in chunks
type StreamingOptions ¶ added in v0.2.0
type StreamingOptions struct {
// ChunkSize is the number of entries per chunk
ChunkSize int
// BufferSize is the size of the buffer used for reading/writing
BufferSize int
}
StreamingOptions contains options for streaming serialization/deserialization
func DefaultStreamingOptions ¶ added in v0.2.0
func DefaultStreamingOptions() StreamingOptions
DefaultStreamingOptions returns the default options for streaming
type StreamingSerializer ¶ added in v0.2.0
type StreamingSerializer struct {
// contains filtered or unexported fields
}
StreamingSerializer handles streaming serialization of snapshots
func NewStreamingSerializer ¶ added in v0.2.0
func NewStreamingSerializer(options StreamingOptions) *StreamingSerializer
NewStreamingSerializer creates a new streaming serializer
func (*StreamingSerializer) SerializeToWriter ¶ added in v0.2.0
func (s *StreamingSerializer) SerializeToWriter(entries []walker.SnapshotEntry, writer io.Writer) error
SerializeToWriter serializes a snapshot to a writer in chunks