Documentation
¶
Index ¶
- Constants
- type Binary
- type Marshaller
- type Proto
- type ProtoingFast
- type StoreData
- type StreamMarshaller
- type VTproto
- func (p *VTproto) Marshal(data *StoreData) ([]byte, error)
- func (p *VTproto) MarshalStream(data *StoreData, estimatedSize int64) io.ReadCloser
- func (p *VTproto) Unmarshal(in []byte) (*StoreData, uint64, error)
- func (p *VTproto) UnmarshalStream(reader io.Reader, estimatedSize int64) (*StoreData, uint64, error)
- func (p *VTproto) UnmarshalStreamFast(reader io.Reader, estimatedSize int64) (*StoreData, uint64, error)
Examples ¶
Constants ¶
const DeletePrefixEntryProtoTag = 0x12
const KVEntryKeyProtoTag = 0x0a
const KVEntryProtoTag = 0x0a
const KVEntryValueProtoTag = 0x12
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Marshaller ¶
type Marshaller interface {
Unmarshal(in []byte) (*StoreData, uint64, error)
Marshal(data *StoreData) ([]byte, error)
}
func Default ¶
func Default() Marshaller
type ProtoingFast ¶
type ProtoingFast struct{}
ProtoingFast is a custom proto marshaller, that will marshal and unmarshall the storeData into a predefined proto struct (see below). The motivation here is that we want to write a proto message, making it readable by other tool with the appropriate message, but want to gain the marshal performance of a custom binary library
message StoreData {
map<string, bytes> kv = 1;
repeated string delete_prefixes = 2;
}
type StreamMarshaller ¶ added in v1.17.2
type VTproto ¶
type VTproto struct{}
Example (Performance_comparison) ¶
// Demonstrate the performance benefits of the streaming approach
// Create a large dataset similar to blockchain state
testData := &StoreData{
Kv: make(map[string][]byte),
DeletePrefixes: []string{"temp:", "cache:", "pending:"},
}
// Add 50k entries (realistic blockchain state size)
for i := range 50000 {
key := fmt.Sprintf("account:%08d", i)
value := fmt.Sprintf(`{"balance":%d,"nonce":%d}`, i*1000, i)
testData.Kv[key] = []byte(value)
}
marshaller := &VTproto{}
serialized, _ := marshaller.Marshal(testData)
// Write to temporary file
tmpFile, _ := os.CreateTemp("", "performance_demo_*.bin")
defer os.Remove(tmpFile.Name())
tmpFile.Write(serialized)
tmpFile.Close()
fileSizeMB := float64(len(serialized)) / (1024 * 1024)
fmt.Printf("Test file size: %.1f MB\n", fileSizeMB)
// Method 1: Traditional ReadAll + Unmarshal
file1, _ := os.Open(tmpFile.Name())
data, _ := io.ReadAll(file1)
file1.Close()
result1, _, _ := marshaller.Unmarshal(data)
fmt.Printf("ReadAll method: Loaded %.1f MB + parsed data = ~%.1f MB peak memory\n",
fileSizeMB, fileSizeMB*2.5)
// Method 2: Optimized streaming
file2, _ := os.Open(tmpFile.Name())
stat, _ := file2.Stat()
result2, _, _ := marshaller.UnmarshalStreamFast(file2, stat.Size())
file2.Close()
fmt.Printf("Streaming method: Only parsed data = ~%.1f MB peak memory\n", fileSizeMB*0.8)
// Verify results are identical
fmt.Printf("Results identical: %v\n", len(result1.Kv) == len(result2.Kv))
fmt.Printf("Memory savings: ~%.1fx less peak memory usage\n", 2.5/0.8)
Output: Test file size: 2.6 MB ReadAll method: Loaded 2.6 MB + parsed data = ~6.6 MB peak memory Streaming method: Only parsed data = ~2.1 MB peak memory Results identical: true Memory savings: ~3.1x less peak memory usage
func (*VTproto) MarshalStream ¶ added in v1.17.2
func (p *VTproto) MarshalStream(data *StoreData, estimatedSize int64) io.ReadCloser
MarshalStream returns an io.ReadCloser that streams the marshaled data. The data is assumed to not change until the returned ReadCloser is closed. estimatedSize is used for buffer optimization; use 0 for auto-sizing. MarshalStream returns an io.ReadCloser that streams the marshaled data. IMPORTANT: The caller MUST call Close() on the returned ReadCloser to prevent resource leaks.
Example ¶
ExampleVTproto_MarshalStream demonstrates how to use streaming marshaling
// Create test data
testData := &StoreData{
Kv: map[string][]byte{
"account:001": []byte(`{"balance":1000,"nonce":1}`),
"account:002": []byte(`{"balance":2000,"nonce":5}`),
"account:003": []byte(`{"balance":500,"nonce":2}`),
},
DeletePrefixes: []string{"temp:", "cache:"},
}
marshaller := &VTproto{}
// Stream marshal - generates data on-the-fly without loading everything in memory
reader := marshaller.MarshalStream(testData, 0) // 0 for auto-sizing
defer reader.Close()
// Read data in chunks (simulating writing to file or network)
buffer := make([]byte, 1024)
totalBytes := 0
for {
n, err := reader.Read(buffer)
if n > 0 {
totalBytes += n
// Process chunk (e.g., write to file, send over network)
fmt.Printf("Read chunk of %d bytes\n", n)
}
if err == io.EOF {
break
}
if err != nil {
panic(fmt.Sprintf("Read error: %v", err))
}
}
fmt.Printf("Total bytes streamed: %d\n", totalBytes)
// Note: Actual output will vary based on data structure and marshaling
Example (FileWrite) ¶
ExampleVTproto_MarshalStream_fileWrite demonstrates streaming to a file
testData := &StoreData{
Kv: map[string][]byte{
"large_data_1": []byte(strings.Repeat("data", 1000)),
"large_data_2": []byte(strings.Repeat("more", 1000)),
},
DeletePrefixes: []string{"cleanup:"},
}
marshaller := &VTproto{}
// Create temporary file
tmpFile, err := os.CreateTemp("", "stream_marshal_*.bin")
if err != nil {
panic(err)
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
// Stream directly to file without loading all data in memory
reader := marshaller.MarshalStream(testData, 0)
defer reader.Close()
// Copy from stream to file efficiently
bytesWritten, err := io.Copy(tmpFile, reader)
if err != nil {
panic(err)
}
fmt.Printf("Streamed %d bytes to file\n", bytesWritten)
// Verify by reading back and unmarshaling
tmpFile.Seek(0, 0) // Reset to beginning
result, _, err := marshaller.UnmarshalStream(tmpFile, bytesWritten)
if err != nil {
panic(err)
}
fmt.Printf("Successfully round-tripped %d KV pairs\n", len(result.Kv))
// Note: Actual byte count and file paths will vary
func (*VTproto) UnmarshalStream ¶ added in v1.17.2
func (p *VTproto) UnmarshalStream(reader io.Reader, estimatedSize int64) (*StoreData, uint64, error)
UnmarshalStream reads and unmarshals data from a stream. The caller is responsible for closing the reader if it implements io.Closer. All internal pooled resources are automatically cleaned up on error or success.
Example ¶
// Create test data
testData := &StoreData{
Kv: map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
},
DeletePrefixes: []string{"prefix1", "prefix2"},
}
// Marshal the data first
marshaller := &VTproto{}
serialized, err := marshaller.Marshal(testData)
if err != nil {
panic(fmt.Sprintf("failed to marshal: %v", err))
}
// Create a reader from the serialized data
reader := bytes.NewReader(serialized)
// Unmarshal using the streaming function
result, dataSize, err := marshaller.UnmarshalStream(reader, 0)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal stream: %v", err))
}
fmt.Printf("Data size: %d\n", dataSize)
fmt.Printf("KV pairs: %d\n", len(result.Kv))
fmt.Printf("Delete prefixes: %d\n", len(result.DeletePrefixes))
Output: Data size: 20 KV pairs: 2 Delete prefixes: 2