ocf

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package ocf implements Avro Object Container Files (OCF).

An OCF is a self-describing binary file format: it embeds the Avro schema in the file header so readers do not need out-of-band schema information. Data is stored in compressed blocks separated by sync markers, making files splittable for parallel processing. OCF is the standard format for storing Avro data on disk; for sending individual values over the wire, see avro.AppendSingleObject instead.

See the Avro specification for the full format definition.

Writing

schema := avro.MustParse(`{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}`)

f, err := os.Create("users.avro")
if err != nil { ... }
w, err := ocf.NewWriter(f, schema, ocf.WithCodec(ocf.SnappyCodec()))
if err != nil { ... }
for _, u := range users {
    if err := w.Encode(&u); err != nil { ... }
}
if err := w.Close(); err != nil { ... }

Reading

f, err := os.Open("users.avro")
if err != nil { ... }
r, err := ocf.NewReader(f)
if err != nil { ... }
for {
    var u User
    if err := r.Decode(&u); err != nil {
        if err == io.EOF { break }
        ...
    }
    fmt.Println(u)
}

Appending

Use NewAppendWriter to add records to an existing file without rewriting it.

Codecs

Null, deflate, snappy, and zstandard are built in. Custom codecs can be provided via WithCodec.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Codec

type Codec interface {
	// Name returns the codec identifier for the "avro.codec" metadata key
	// (e.g. "null", "deflate", "snappy", "zstandard").
	Name() string

	// Compress encodes a raw data block for storage.
	Compress(src []byte) ([]byte, error)

	// Decompress decodes a stored data block back to raw bytes.
	Decompress(src []byte) ([]byte, error)

	// Close releases any resources held by the codec. Codecs that hold no
	// resources may return nil.
	Close() error
}

Codec compresses and decompresses OCF data blocks.

func DeflateCodec

func DeflateCodec(level int) Codec

DeflateCodec returns a Codec using raw DEFLATE compression at the given level (e.g. flate.DefaultCompression).

func MustZstdCodec

func MustZstdCodec(eopts []zstd.EOption, dopts []zstd.DOption) Codec

MustZstdCodec is like ZstdCodec but panics on error. This is useful for inline codec creation with static options:

w, err := ocf.NewWriter(f, schema, ocf.WithCodec(ocf.MustZstdCodec(nil, nil)))

func NopCloser

func NopCloser(c Codec) Codec

NopCloser returns a Codec that wraps c but has a no-op Close method. This is useful when sharing a single codec across multiple writers or readers so that individual Writer.Close or Reader.Close calls do not release shared resources. The caller is responsible for closing the underlying codec when it is no longer needed.

func SnappyCodec

func SnappyCodec() Codec

SnappyCodec returns a Codec using Snappy compression with a trailing CRC-32 checksum per block, as required by the Avro spec.

func ZstdCodec

func ZstdCodec(eopts []zstd.EOption, dopts []zstd.DOption) (Codec, error)

ZstdCodec returns a Codec using Zstandard compression. Encoder options (eopts) and decoder options (dopts) are passed to zstd.NewWriter and zstd.NewReader respectively. Both may be nil for defaults.

zstd.WithEncoderConcurrency(1) and zstd.WithDecoderConcurrency(1) are prepended to the options; pass a different concurrency to override.

A single ZstdCodec is safe to share across multiple readers and writers via NopCloser.

type Opt

type Opt interface {
	WriterOpt
	ReaderOpt
}

Opt is an option that applies to both NewWriter and NewReader.

func WithCodec

func WithCodec(c Codec) Opt

WithCodec sets the compression codec. The default is null (no compression). WithCodec can be used as both a WriterOpt and a ReaderOpt. The four built-in codecs (null, deflate, snappy, zstandard) do not need to be registered for reading. A custom codec whose name matches a built-in overrides it.

The codec's Close method is called by Writer.Close and Reader.Close. Codecs that should not be closed (e.g. shared across multiple writers) should return nil from Close.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader decodes Avro values from an OCF.

func NewReader

func NewReader(r io.Reader, opts ...ReaderOpt) (*Reader, error)

NewReader creates a Reader that decodes an OCF from r. The header is read immediately. Use WithCodec if the file uses a non-built-in codec.

Example (Evolution)
package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/twmb/avro"
	"github.com/twmb/avro/ocf"
)

func main() {
	// Write v1 data (name only).
	v1Schema := avro.MustParse(`{
		"type": "record", "name": "User",
		"fields": [{"name": "name", "type": "string"}]
	}`)

	var buf bytes.Buffer
	w, err := ocf.NewWriter(&buf, v1Schema)
	if err != nil {
		log.Fatal(err)
	}
	for _, name := range []string{"Alice", "Bob"} {
		if err := w.Encode(map[string]any{"name": name}); err != nil {
			log.Fatal(err)
		}
	}
	if err := w.Close(); err != nil {
		log.Fatal(err)
	}

	// Read with a v2 schema that added an age field with a default.
	v2Schema := avro.MustParse(`{
		"type": "record", "name": "User",
		"fields": [
			{"name": "name", "type": "string"},
			{"name": "age",  "type": "int", "default": 0}
		]
	}`)

	type User struct {
		Name string `avro:"name"`
		Age  int32  `avro:"age"`
	}

	r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()), ocf.WithReaderSchema(v2Schema))
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()
	for {
		var u User
		if err := r.Decode(&u); err != nil {
			break
		}
		fmt.Printf("%s age=%d\n", u.Name, u.Age)
	}
}
Output:
Alice age=0
Bob age=0

func (*Reader) Close

func (rd *Reader) Close() error

Close closes the codec, releasing any resources it holds.

func (*Reader) Decode

func (rd *Reader) Decode(v any) error

Decode reads the next datum into v, returning io.EOF at end of file.

func (*Reader) Metadata

func (rd *Reader) Metadata() map[string][]byte

Metadata returns the raw metadata from the file header, including both "avro.*" and user-defined keys. The returned map must not be modified.

func (*Reader) Schema

func (rd *Reader) Schema() *avro.Schema

Schema returns the schema parsed from the file header.

type ReaderOpt

type ReaderOpt interface {
	// contains filtered or unexported methods
}

ReaderOpt is an option for NewReader.

func WithMaxBlockBytes

func WithMaxBlockBytes(n int64) ReaderOpt

WithMaxBlockBytes sets the maximum compressed block size in bytes that the reader will accept. The default is 64 MiB. This guards against malicious or corrupt files that declare very large blocks.

func WithReaderSchema

func WithReaderSchema(s *avro.Schema) ReaderOpt

WithReaderSchema provides the reader schema to resolve the file's writer schema against via avro.Resolve. Subsequent Reader.Decode calls use the resolved schema. Fields added in the reader schema must have defaults; writer fields absent from the reader schema are skipped.

Use WithReaderSchemaFunc when the reader schema must be chosen based on the file's header (metadata or writer-schema shape).

At most one of WithReaderSchema and WithReaderSchemaFunc may be used.

func WithReaderSchemaFunc added in v1.6.0

func WithReaderSchemaFunc(fn func(rd *Reader) (*avro.Schema, error)) ReaderOpt

WithReaderSchemaFunc is the dynamic counterpart to WithReaderSchema. The callback is invoked by NewReader after the OCF header has been parsed, so it can inspect the file's writer schema and metadata via rd.Schema() and rd.Metadata() before deciding which reader schema to resolve against.

If the callback returns a non-nil schema, the writer schema is resolved against it via avro.Resolve and subsequent Reader.Decode calls use the resolved schema.

If the callback returns (nil, nil), no resolution is performed and records decode against the writer schema directly — equivalent to not passing any reader-schema option at all.

If the callback returns a non-nil error, NewReader returns that error.

The callback must not call rd.Decode or rd.Close; rd is only valid for read-only header inspection during the callback.

At most one of WithReaderSchema and WithReaderSchemaFunc may be used.

Example

ExampleWithReaderSchemaFunc demonstrates choosing the reader schema based on state that's only available after the OCF header is parsed — for example, a metadata key that distinguishes between old and new file variants, or a writer-schema shape that changed between versions of the producer. The callback runs after NewReader has read the header, so rd.Schema() and rd.Metadata() are populated; whatever schema it returns becomes the reader schema for resolution against the writer schema.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/twmb/avro"
	"github.com/twmb/avro/ocf"
)

func main() {
	// Producer v1 wrote records with a legacy field name:
	v1Schema := avro.MustParse(`{
		"type": "record", "name": "Event",
		"fields": [{"name": "legacy_ts", "type": "long"}]
	}`)

	var buf bytes.Buffer
	w, err := ocf.NewWriter(&buf, v1Schema,
		ocf.WithMetadata(map[string][]byte{"producer-version": []byte("1")}))
	if err != nil {
		log.Fatal(err)
	}
	if err := w.Encode(map[string]any{"legacy_ts": int64(1700000000)}); err != nil {
		log.Fatal(err)
	}
	if err := w.Close(); err != nil {
		log.Fatal(err)
	}

	// Our application reads with two reader schemas — one per producer
	// version — each using the spec-correct field name "ts" but declaring
	// the old name as an alias so records from either version decode into
	// the same struct without coalescing.
	v1Reader := avro.MustParse(`{
		"type": "record", "name": "Event",
		"fields": [{"name": "ts", "type": "long", "aliases": ["legacy_ts"]}]
	}`)
	v2Reader := avro.MustParse(`{
		"type": "record", "name": "Event",
		"fields": [{"name": "ts", "type": "long"}]
	}`)

	type Event struct {
		TS int64 `avro:"ts"`
	}

	r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()),
		ocf.WithReaderSchemaFunc(func(rd *ocf.Reader) (*avro.Schema, error) {
			// Header has been parsed. Pick the reader schema based on
			// whichever producer wrote the file.
			if string(rd.Metadata()["producer-version"]) == "1" {
				return v1Reader, nil
			}
			return v2Reader, nil
		}))
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()

	var e Event
	if err := r.Decode(&e); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("ts=%d\n", e.TS)
}
Output:
ts=1700000000

func WithSchemaOpts added in v1.4.0

func WithSchemaOpts(opts ...avro.SchemaOpt) ReaderOpt

WithSchemaOpts passes avro.SchemaOpt values (such as avro.CustomType) to the avro.Parse call that parses the file header's embedded schema. This allows registering custom type conversions for the reader's schema.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer encodes Avro values into an OCF. Values are buffered into blocks that are compressed and flushed automatically. Close must be called to flush remaining items.

func NewAppendWriter

func NewAppendWriter(rws io.ReadWriteSeeker, opts ...WriterOpt) (*Writer, error)

NewAppendWriter opens an existing OCF for appending. It reads the header to recover the schema, codec, and sync marker, then seeks to the end.

WithBlockCount and WithBlockBytes are honored. WithCodec can provide a codec implementation for non-built-in codecs (matched by name against the header). Other options are ignored.

func NewWriter

func NewWriter(w io.Writer, s *avro.Schema, opts ...WriterOpt) (*Writer, error)

NewWriter creates a Writer that writes an OCF to w. The file header is written immediately.

Example
package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/twmb/avro"
	"github.com/twmb/avro/ocf"
)

func main() {
	schema := avro.MustParse(`{
		"type": "record",
		"name": "User",
		"fields": [
			{"name": "name", "type": "string"},
			{"name": "age",  "type": "int"}
		]
	}`)

	type User struct {
		Name string `avro:"name"`
		Age  int32  `avro:"age"`
	}

	var buf bytes.Buffer
	w, err := ocf.NewWriter(&buf, schema)
	if err != nil {
		log.Fatal(err)
	}
	for _, u := range []User{
		{Name: "Alice", Age: 30},
		{Name: "Bob", Age: 25},
	} {
		if err := w.Encode(&u); err != nil {
			log.Fatal(err)
		}
	}
	if err := w.Close(); err != nil {
		log.Fatal(err)
	}

	// Read back.
	r, err := ocf.NewReader(bytes.NewReader(buf.Bytes()))
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()
	for {
		var u User
		if err := r.Decode(&u); err != nil {
			break
		}
		fmt.Printf("%s is %d\n", u.Name, u.Age)
	}
}
Output:
Alice is 30
Bob is 25

func (*Writer) Close

func (w *Writer) Close() error

Close flushes any remaining items and closes the codec.

func (*Writer) Encode

func (w *Writer) Encode(v any) error

Encode serializes v and appends it to the current block. The block is flushed automatically when it hits the count or byte limit.

After any error the Writer is poisoned: all subsequent calls return the same error.

func (*Writer) Flush

func (w *Writer) Flush() error

Flush writes any buffered items as a block. The Writer remains usable.

func (*Writer) Reset

func (w *Writer) Reset(dst io.Writer) error

Reset flushes buffered items to the current destination, then starts a new OCF on dst reusing the original schema, codec, and options. If the Writer is in an error state the flush is skipped and the error is cleared.

func (*Writer) Schema

func (wr *Writer) Schema() *avro.Schema

Schema returns the schema used by this Writer.

func (*Writer) Write

func (w *Writer) Write(p []byte) (int, error)

Write appends pre-encoded Avro bytes as a single datum to the current block. The caller must ensure p is exactly one datum encoded with the writer's schema. Auto-flushing rules are the same as [Encode].

type WriterOpt

type WriterOpt interface {
	// contains filtered or unexported methods
}

WriterOpt is an option for NewWriter.

func WithBlockBytes

func WithBlockBytes(n int) WriterOpt

WithBlockBytes sets the maximum uncompressed size of a block in bytes before it is flushed. The default is 64 KiB. If both WithBlockCount and WithBlockBytes are set, whichever limit is hit first triggers a flush.

func WithBlockCount

func WithBlockCount(n int) WriterOpt

WithBlockCount sets the maximum number of items per block. The default is 0 (unlimited). If both WithBlockCount and WithBlockBytes are set, whichever limit is hit first triggers a flush.

func WithMetadata

func WithMetadata(m map[string][]byte) WriterOpt

WithMetadata adds custom metadata to the file header. Keys starting with "avro." are reserved by the spec. Multiple calls are cumulative.

func WithSchema

func WithSchema(schema string) WriterOpt

WithSchema overrides the schema JSON written to the file header. By default avro.Schema.Canonical is used, which strips non-essential properties like doc strings and aliases. Use this to preserve those properties or to write a custom schema string.

func WithSyncMarker

func WithSyncMarker(sync [16]byte) WriterOpt

WithSyncMarker sets the 16-byte sync marker written between blocks. By default a random marker is generated. This is primarily useful for deterministic test output.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL