Documentation
¶
Overview ¶
Package ocf implements Avro Object Container Files (OCF).
An OCF is a self-describing binary file format: it embeds the Avro schema in the file header so readers do not need out-of-band schema information. Data is stored in compressed blocks separated by sync markers, making files splittable for parallel processing. OCF is the standard format for storing Avro data on disk; for sending individual values over the wire, see avro.AppendSingleObject instead.
See the Avro specification for the full format definition.
Writing ¶
schema := avro.MustParse(`{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}`)
f, err := os.Create("users.avro")
if err != nil { ... }
w, err := ocf.NewWriter(f, schema, ocf.WithCodec(ocf.SnappyCodec()))
if err != nil { ... }
for _, u := range users {
if err := w.Encode(&u); err != nil { ... }
}
if err := w.Close(); err != nil { ... }
Reading ¶
f, err := os.Open("users.avro")
if err != nil { ... }
r, err := ocf.NewReader(f)
if err != nil { ... }
for {
var u User
if err := r.Decode(&u); err != nil {
if err == io.EOF { break }
...
}
fmt.Println(u)
}
Appending ¶
Use NewAppendWriter to add records to an existing file without rewriting it.
Codecs ¶
Null, deflate, snappy, and zstandard are built in. Custom codecs can be provided via WithCodec.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Codec ¶
type Codec interface {
// Name returns the codec identifier for the "avro.codec" metadata key
// (e.g. "null", "deflate", "snappy", "zstandard").
Name() string
// Compress encodes a raw data block for storage.
Compress(src []byte) ([]byte, error)
// Decompress decodes a stored data block back to raw bytes.
Decompress(src []byte) ([]byte, error)
// Close releases any resources held by the codec. Codecs that hold no
// resources may return nil.
Close() error
}
Codec compresses and decompresses OCF data blocks.
func DeflateCodec ¶
DeflateCodec returns a Codec using raw DEFLATE compression at the given level (e.g. flate.DefaultCompression).
func MustZstdCodec ¶
MustZstdCodec is like ZstdCodec but panics on error. This is useful for inline codec creation with static options:
w, err := ocf.NewWriter(f, schema, ocf.WithCodec(ocf.MustZstdCodec(nil, nil)))
func NopCloser ¶
NopCloser returns a Codec that wraps c but has a no-op Close method. This is useful when sharing a single codec across multiple writers or readers so that individual Writer.Close or Reader.Close calls do not release shared resources. The caller is responsible for closing the underlying codec when it is no longer needed.
func SnappyCodec ¶
func SnappyCodec() Codec
SnappyCodec returns a Codec using Snappy compression with a trailing CRC-32 checksum per block, as required by the Avro spec.
func ZstdCodec ¶
ZstdCodec returns a Codec using Zstandard compression. Encoder options (eopts) and decoder options (dopts) are passed to zstd.NewWriter and zstd.NewReader respectively. Both may be nil for defaults.
zstd.WithEncoderConcurrency(1) and zstd.WithDecoderConcurrency(1) are prepended to the options; pass a different concurrency to override.
A single ZstdCodec is safe to share across multiple readers and writers via NopCloser.
type Opt ¶
Opt is an option that applies to both NewWriter and NewReader.
func WithCodec ¶
WithCodec sets the compression codec. The default is null (no compression). WithCodec can be used as both a WriterOpt and a ReaderOpt. The four built-in codecs (null, deflate, snappy, zstandard) do not need to be registered for reading. A custom codec whose name matches a built-in overrides it.
The codec's Close method is called by Writer.Close and Reader.Close. Codecs that should not be closed (e.g. shared across multiple writers) should return nil from Close.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader decodes Avro values from an OCF.
func NewReader ¶
NewReader creates a Reader that decodes an OCF from r. The header is read immediately. Use WithCodec if the file uses a non-built-in codec.
Example (Evolution) ¶
package main
import (
"bytes"
"fmt"
"log"
"github.com/twmb/avro"
"github.com/twmb/avro/ocf"
)
func main() {
// Write v1 data (name only).
v1Schema := avro.MustParse(`{
"type": "record", "name": "User",
"fields": [{"name": "name", "type": "string"}]
}`)
var buf bytes.Buffer
w, err := ocf.NewWriter(&buf, v1Schema)
if err != nil {
log.Fatal(err)
}
for _, name := range []string{"Alice", "Bob"} {
if err := w.Encode(map[string]any{"name": name}); err != nil {
log.Fatal(err)
}
}
if err := w.Close(); err != nil {
log.Fatal(err)
}
// Read with a v2 schema that added an age field with a default.
v2Schema := avro.MustParse(`{
"type": "record", "name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int", "default": 0}
]
}`)
type User struct {
Name string `avro:"name"`
Age int32 `avro:"age"`
}
r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()), ocf.WithReaderSchema(v2Schema))
if err != nil {
log.Fatal(err)
}
defer r.Close()
for {
var u User
if err := r.Decode(&u); err != nil {
break
}
fmt.Printf("%s age=%d\n", u.Name, u.Age)
}
}
Output: Alice age=0 Bob age=0
type ReaderOpt ¶
type ReaderOpt interface {
// contains filtered or unexported methods
}
ReaderOpt is an option for NewReader.
func WithMaxBlockBytes ¶
WithMaxBlockBytes sets the maximum compressed block size in bytes that the reader will accept. The default is 64 MiB. This guards against malicious or corrupt files that declare very large blocks.
func WithReaderSchema ¶
WithReaderSchema provides the reader schema to resolve the file's writer schema against via avro.Resolve. Subsequent Reader.Decode calls use the resolved schema. Fields added in the reader schema must have defaults; writer fields absent from the reader schema are skipped.
Use WithReaderSchemaFunc when the reader schema must be chosen based on the file's header (metadata or writer-schema shape).
At most one of WithReaderSchema and WithReaderSchemaFunc may be used.
func WithReaderSchemaFunc ¶ added in v1.6.0
WithReaderSchemaFunc is the dynamic counterpart to WithReaderSchema. The callback is invoked by NewReader after the OCF header has been parsed, so it can inspect the file's writer schema and metadata via rd.Schema() and rd.Metadata() before deciding which reader schema to resolve against.
If the callback returns a non-nil schema, the writer schema is resolved against it via avro.Resolve and subsequent Reader.Decode calls use the resolved schema.
If the callback returns (nil, nil), no resolution is performed and records decode against the writer schema directly — equivalent to not passing any reader-schema option at all.
If the callback returns a non-nil error, NewReader returns that error.
The callback must not call rd.Decode or rd.Close; rd is only valid for read-only header inspection during the callback.
At most one of WithReaderSchema and WithReaderSchemaFunc may be used.
Example ¶
ExampleWithReaderSchemaFunc demonstrates choosing the reader schema based on state that's only available after the OCF header is parsed — for example, a metadata key that distinguishes between old and new file variants, or a writer-schema shape that changed between versions of the producer. The callback runs after NewReader has read the header, so rd.Schema() and rd.Metadata() are populated; whatever schema it returns becomes the reader schema for resolution against the writer schema.
package main
import (
"bytes"
"fmt"
"log"
"github.com/twmb/avro"
"github.com/twmb/avro/ocf"
)
func main() {
// Producer v1 wrote records with a legacy field name:
v1Schema := avro.MustParse(`{
"type": "record", "name": "Event",
"fields": [{"name": "legacy_ts", "type": "long"}]
}`)
var buf bytes.Buffer
w, err := ocf.NewWriter(&buf, v1Schema,
ocf.WithMetadata(map[string][]byte{"producer-version": []byte("1")}))
if err != nil {
log.Fatal(err)
}
if err := w.Encode(map[string]any{"legacy_ts": int64(1700000000)}); err != nil {
log.Fatal(err)
}
if err := w.Close(); err != nil {
log.Fatal(err)
}
// Our application reads with two reader schemas — one per producer
// version — each using the spec-correct field name "ts" but declaring
// the old name as an alias so records from either version decode into
// the same struct without coalescing.
v1Reader := avro.MustParse(`{
"type": "record", "name": "Event",
"fields": [{"name": "ts", "type": "long", "aliases": ["legacy_ts"]}]
}`)
v2Reader := avro.MustParse(`{
"type": "record", "name": "Event",
"fields": [{"name": "ts", "type": "long"}]
}`)
type Event struct {
TS int64 `avro:"ts"`
}
r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()),
ocf.WithReaderSchemaFunc(func(rd *ocf.Reader) (*avro.Schema, error) {
// Header has been parsed. Pick the reader schema based on
// whichever producer wrote the file.
if string(rd.Metadata()["producer-version"]) == "1" {
return v1Reader, nil
}
return v2Reader, nil
}))
if err != nil {
log.Fatal(err)
}
defer r.Close()
var e Event
if err := r.Decode(&e); err != nil {
log.Fatal(err)
}
fmt.Printf("ts=%d\n", e.TS)
}
Output: ts=1700000000
func WithSchemaOpts ¶ added in v1.4.0
WithSchemaOpts passes avro.SchemaOpt values (such as avro.CustomType) to the avro.Parse call that parses the file header's embedded schema. This allows registering custom type conversions for the reader's schema.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer encodes Avro values into an OCF. Values are buffered into blocks that are compressed and flushed automatically. Close must be called to flush remaining items.
func NewAppendWriter ¶
func NewAppendWriter(rws io.ReadWriteSeeker, opts ...WriterOpt) (*Writer, error)
NewAppendWriter opens an existing OCF for appending. It reads the header to recover the schema, codec, and sync marker, then seeks to the end.
WithBlockCount and WithBlockBytes are honored. WithCodec can provide a codec implementation for non-built-in codecs (matched by name against the header). Other options are ignored.
func NewWriter ¶
NewWriter creates a Writer that writes an OCF to w. The file header is written immediately.
Example ¶
package main
import (
"bytes"
"fmt"
"log"
"github.com/twmb/avro"
"github.com/twmb/avro/ocf"
)
func main() {
schema := avro.MustParse(`{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}`)
type User struct {
Name string `avro:"name"`
Age int32 `avro:"age"`
}
var buf bytes.Buffer
w, err := ocf.NewWriter(&buf, schema)
if err != nil {
log.Fatal(err)
}
for _, u := range []User{
{Name: "Alice", Age: 30},
{Name: "Bob", Age: 25},
} {
if err := w.Encode(&u); err != nil {
log.Fatal(err)
}
}
if err := w.Close(); err != nil {
log.Fatal(err)
}
// Read back.
r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()))
if err != nil {
log.Fatal(err)
}
defer r.Close()
for {
var u User
if err := r.Decode(&u); err != nil {
break
}
fmt.Printf("%s is %d\n", u.Name, u.Age)
}
}
Output: Alice is 30 Bob is 25
func (*Writer) Encode ¶
Encode serializes v and appends it to the current block. The block is flushed automatically when it hits the count or byte limit.
After any error the Writer is poisoned: all subsequent calls return the same error.
func (*Writer) Reset ¶
Reset flushes buffered items to the current destination, then starts a new OCF on dst reusing the original schema, codec, and options. If the Writer is in an error state the flush is skipped and the error is cleared.
type WriterOpt ¶
type WriterOpt interface {
// contains filtered or unexported methods
}
WriterOpt is an option for NewWriter.
func WithBlockBytes ¶
WithBlockBytes sets the maximum uncompressed size of a block in bytes before it is flushed. The default is 64 KiB. If both WithBlockCount and WithBlockBytes are set, whichever limit is hit first triggers a flush.
func WithBlockCount ¶
WithBlockCount sets the maximum number of items per block. The default is 0 (unlimited). If both WithBlockCount and WithBlockBytes are set, whichever limit is hit first triggers a flush.
func WithMetadata ¶
WithMetadata adds custom metadata to the file header. Keys starting with "avro." are reserved by the spec. Multiple calls are cumulative.
func WithSchema ¶
WithSchema overrides the schema JSON written to the file header. By default avro.Schema.Canonical is used, which strips non-essential properties like doc strings and aliases. Use this to preserve those properties or to write a custom schema string.
func WithSyncMarker ¶
WithSyncMarker sets the 16-byte sync marker written between blocks. By default a random marker is generated. This is primarily useful for deterministic test output.