ocf

package
v2.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 14 Imported by: 65

Documentation

Overview

Package ocf implements encoding and decoding of Avro Object Container Files as defined by the Avro specification.

See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/

Index

Examples

Constants

This section is empty.

Variables

View Source
var (

	// HeaderSchema is the Avro schema of a container file header.
	HeaderSchema = avro.MustParse(`{
	"type": "record",
	"name": "org.apache.avro.file.Header",
	"fields": [
		{"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
		{"name": "meta", "type": {"type": "map", "values": "bytes"}},
		{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
	]
}`)

	// DefaultSchemaMarshaler calls the schema's String() method, to produce
	// a "canonical" schema.
	DefaultSchemaMarshaler = defaultMarshalSchema
	// FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
	// a schema with all details preserved. The "canonical" schema returned by
	// the default marshaler does not preserve a type's extra properties.
	FullSchemaMarshaler = fullMarshalSchema
)

Functions

This section is empty.

Types

type Codec

type Codec interface {
	// Decode decodes the given bytes.
	Decode([]byte) ([]byte, error)
	// Encode encodes the given bytes.
	Encode([]byte) []byte
}

Codec represents a compression codec.

type CodecName

type CodecName string

CodecName represents a compression codec name.

const (
	Null      CodecName = "null"
	Deflate   CodecName = "deflate"
	Snappy    CodecName = "snappy"
	ZStandard CodecName = "zstandard"
)

Supported compression codecs.

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder reads and decodes Avro values from a container file.

func NewDecoder

func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error)

NewDecoder returns a new decoder that reads from reader r.

Example
package main

import (
	"log"
	"os"

	"github.com/hamba/avro/v2/ocf"
)

func main() {
	type SimpleRecord struct {
		A int64  `avro:"a"`
		B string `avro:"b"`
	}

	f, err := os.Open("/your/avro/file.avro")
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	dec, err := ocf.NewDecoder(f)
	if err != nil {
		log.Fatal(err)
	}

	for dec.HasNext() {
		var record SimpleRecord
		err = dec.Decode(&record)
		if err != nil {
			log.Fatal(err)
		}

		// Do something with the data
	}

	if err := dec.Error(); err != nil {
		log.Fatal(err)
	}
}

func (*Decoder) Close added in v2.31.0

func (d *Decoder) Close() error

Close releases codec resources.

func (*Decoder) Decode

func (d *Decoder) Decode(v any) error

Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v.

func (*Decoder) Error

func (d *Decoder) Error() error

Error returns the last reader error.

func (*Decoder) HasNext

func (d *Decoder) HasNext() bool

HasNext determines if there is another value to read.

func (*Decoder) Metadata

func (d *Decoder) Metadata() map[string][]byte

Metadata returns the header metadata.

func (*Decoder) Schema added in v2.27.0

func (d *Decoder) Schema() avro.Schema

Schema returns the schema that was parsed from the file's metadata and that is used to interpret the file's contents.

type DecoderFunc added in v2.27.0

type DecoderFunc func(cfg *decoderConfig)

DecoderFunc represents a configuration function for Decoder.

func WithDecoderConfig added in v2.27.0

func WithDecoderConfig(wCfg avro.API) DecoderFunc

WithDecoderConfig sets the value decoder config on the OCF decoder.

func WithDecoderSchemaCache added in v2.27.0

func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc

WithDecoderSchemaCache sets the schema cache for the decoder. If not specified, defaults to avro.DefaultSchemaCache.

func WithZStandardDecoder added in v2.31.0

func WithZStandardDecoder(dec *zstd.Decoder) DecoderFunc

WithZStandardDecoder sets a pre-created ZStandard decoder to be reused. This allows sharing a single decoder across multiple OCF decoders for efficiency. The caller is responsible for closing the decoder after all OCF decoders are done.

func WithZStandardDecoderOptions added in v2.28.0

func WithZStandardDecoderOptions(opts ...zstd.DOption) DecoderFunc

WithZStandardDecoderOptions sets the options for the ZStandard decoder.

type DeflateCodec

type DeflateCodec struct {
	// contains filtered or unexported fields
}

DeflateCodec is a flate compression codec.

func (*DeflateCodec) Decode

func (c *DeflateCodec) Decode(b []byte) ([]byte, error)

Decode decodes the given bytes.

func (*DeflateCodec) Encode

func (c *DeflateCodec) Encode(b []byte) []byte

Encode encodes the given bytes.

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder writes Avro container file to an output stream.

func NewEncoder

func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error)

NewEncoder returns a new encoder that writes to w using schema s.

If the writer is an existing ocf file, it will append data using the existing schema.

Example
package main

import (
	"log"
	"os"

	"github.com/hamba/avro/v2/ocf"
)

func main() {
	schema := `{
	    "type": "record",
	    "name": "simple",
	    "namespace": "org.hamba.avro",
	    "fields" : [
	        {"name": "a", "type": "long"},
	        {"name": "b", "type": "string"}
	    ]
	}`

	type SimpleRecord struct {
		A int64  `avro:"a"`
		B string `avro:"b"`
	}

	f, err := os.Open("/your/avro/file.avro")
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	enc, err := ocf.NewEncoder(schema, f)
	if err != nil {
		log.Fatal(err)
	}

	var record SimpleRecord
	err = enc.Encode(record)
	if err != nil {
		log.Fatal(err)
	}

	if err := enc.Flush(); err != nil {
		log.Fatal(err)
	}

	if err := f.Sync(); err != nil {
		log.Fatal(err)
	}
}

func NewEncoderWithSchema added in v2.27.0

func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error)

NewEncoderWithSchema returns a new encoder that writes to w using schema s.

If the writer is an existing ocf file, it will append data using the existing schema.

func (*Encoder) Close

func (e *Encoder) Close() error

Close closes the encoder, flushing the writer and releasing codec resources.

func (*Encoder) Encode

func (e *Encoder) Encode(v any) error

Encode writes the Avro encoding of v to the stream.

func (*Encoder) Flush

func (e *Encoder) Flush() error

Flush flushes the underlying writer.

func (*Encoder) Reset added in v2.31.0

func (e *Encoder) Reset(w io.Writer) error

Reset flushes any pending data, resets the encoder to write to a new io.Writer, and writes a fresh header with a new sync marker. The schema, codec, and other settings are preserved from the original encoder. This allows reusing the encoder for multiple files without reallocating buffers.

func (*Encoder) Write

func (e *Encoder) Write(p []byte) (n int, err error)

Write v to the internal buffer. This method skips the internal encoder and therefore the caller is responsible for encoding the bytes. No error will be thrown if the bytes does not conform to the schema given to NewEncoder, but the final ocf data will be corrupted.

type EncoderFunc

type EncoderFunc func(cfg *encoderConfig)

EncoderFunc represents a configuration function for Encoder.

func WithBlockLength

func WithBlockLength(length int) EncoderFunc

WithBlockLength sets the block length on the encoder.

func WithBlockSize added in v2.29.0

func WithBlockSize(size int) EncoderFunc

WithBlockSize sets the maximum uncompressed size of a buffered block before it is written and flushed to the underlying io.Writer (after compression).

func WithCodec

func WithCodec(codec CodecName) EncoderFunc

WithCodec sets the compression codec on the encoder.

func WithCompressionLevel

func WithCompressionLevel(compLvl int) EncoderFunc

WithCompressionLevel sets the compression codec to deflate and the compression level on the encoder.

func WithEncoderSchemaCache added in v2.27.0

func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc

WithEncoderSchemaCache sets the schema cache for the encoder. If not specified, defaults to avro.DefaultSchemaCache.

func WithEncodingConfig added in v2.11.0

func WithEncodingConfig(wCfg avro.API) EncoderFunc

WithEncodingConfig sets the value encoder config on the OCF encoder.

func WithMetadata

func WithMetadata(meta map[string][]byte) EncoderFunc

WithMetadata sets the metadata on the encoder header.

func WithMetadataKeyVal added in v2.29.0

func WithMetadataKeyVal(key string, val []byte) EncoderFunc

WithMetadataKeyVal sets a single key-value pair for the metadata on the encoder header.

func WithSchemaMarshaler added in v2.27.0

func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc

WithSchemaMarshaler sets the schema marshaler for the encoder. If not specified, defaults to DefaultSchemaMarshaler.

func WithSyncBlock added in v2.2.0

func WithSyncBlock(sync [16]byte) EncoderFunc

WithSyncBlock sets the sync block.

func WithZStandardEncoder added in v2.31.0

func WithZStandardEncoder(enc *zstd.Encoder) EncoderFunc

WithZStandardEncoder sets a pre-created ZStandard encoder to be reused. This allows sharing a single encoder across multiple OCF encoders for efficiency. The caller is responsible for closing the encoder after all OCF encoders are done.

func WithZStandardEncoderOptions added in v2.28.0

func WithZStandardEncoderOptions(opts ...zstd.EOption) EncoderFunc

WithZStandardEncoderOptions sets the options for the ZStandard encoder.

type Header struct {
	Magic [4]byte           `avro:"magic"`
	Meta  map[string][]byte `avro:"meta"`
	Sync  [16]byte          `avro:"sync"`
}

Header represents an Avro container file header.

type NullCodec

type NullCodec struct{}

NullCodec is a no op codec.

func (*NullCodec) Decode

func (*NullCodec) Decode(b []byte) ([]byte, error)

Decode decodes the given bytes.

func (*NullCodec) Encode

func (*NullCodec) Encode(b []byte) []byte

Encode encodes the given bytes.

type SnappyCodec

type SnappyCodec struct{}

SnappyCodec is a snappy compression codec.

func (*SnappyCodec) Decode

func (*SnappyCodec) Decode(b []byte) ([]byte, error)

Decode decodes the given bytes.

func (*SnappyCodec) Encode

func (*SnappyCodec) Encode(b []byte) []byte

Encode encodes the given bytes.

type ZStandardCodec added in v2.19.0

type ZStandardCodec struct {
	// contains filtered or unexported fields
}

ZStandardCodec is a zstandard compression codec.

func (*ZStandardCodec) Close added in v2.31.0

func (zstdCodec *ZStandardCodec) Close() error

Close closes the zstandard encoder and decoder, releasing resources. Shared instances (provided via WithZStandardEncoder/WithZStandardDecoder) are not closed.

func (*ZStandardCodec) Decode added in v2.19.0

func (zstdCodec *ZStandardCodec) Decode(b []byte) ([]byte, error)

Decode decodes the given bytes.

func (*ZStandardCodec) Encode added in v2.19.0

func (zstdCodec *ZStandardCodec) Encode(b []byte) []byte

Encode encodes the given bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL