bufarrowlib

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

README

bufarrowLib 🦬

bufarrowlib-logo

Go Reference Go Report Card

Protobuf → Apache Arrow. Raw bytes in, RecordBatches out. No codegen. No deserialization. No copies.

Give bufarrowlib a protobuf descriptor and a stream of wire-format bytes. Get back Arrow RecordBatches ready for DuckDB, Parquet, or any Arrow-native tool. Replace hundreds of lines of hand-written builder code with a YAML file or a path list. On production-shaped ad-tech traffic, AppendDenormRaw delivers ~296K msg/s — 39% faster than hand-written Arrow builders with 57% fewer allocations.

Python bindings (pybufarrow) give zero-copy access via the Arrow C Data Interface — all parsing runs in Go.


Use cases

Scenario Why bufarrowlib
Kafka / gRPC → columnar analytics Parse raw wire bytes directly; zero deserialization cost
ETL to Parquet / DuckDB / ClickHouse Declarative denormalization via YAML or path list
OpenRTB / ad-tech event flattening Fan-out across nested repeated fields in one pass
Schema-driven services Runtime .proto compilation — no protoc, no codegen
Multi-goroutine stream processors Clone + shared HyperType: independent builders, aggregated PGO

Install

go get -u github.com/loicalleyne/bufarrowlib@latest

Quick Start

Raw bytes → flat Arrow (fastest path)
import (
    ba    "github.com/loicalleyne/bufarrowlib"
    "github.com/loicalleyne/bufarrowlib/proto/pbpath"
    "github.com/apache/arrow-go/v18/arrow/memory"
)

// Create once per message type — thread-safe, shared across goroutines.
ht := ba.NewHyperType(md, ba.WithAutoRecompile(10_000, 0.01))

tc, _ := ba.New(md, memory.DefaultAllocator,
    ba.WithHyperType(ht),
    ba.WithDenormalizerPlan(
        pbpath.PlanPath("id",                 pbpath.Alias("request_id")),
        pbpath.PlanPath("imp[*].id",          pbpath.Alias("imp_id")),
        pbpath.PlanPath("imp[*].bidfloor",    pbpath.Alias("floor")),
        pbpath.PlanPath("device.geo.country", pbpath.Alias("country")),
    ),
)
defer tc.Release()

for _, raw := range kafkaMessages {
    tc.AppendDenormRaw(raw)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
YAML-driven config (no Go for the plan)
# denorm.yaml
proto:
  file: schema/bidrequest.proto
  message: BidRequest
  import_paths: [./schema]

denormalizer:
  columns:
    - name: request_id
      path: id
    - name: imp_id
      path: imp[*].id
    - name: floor_price
      path: imp[*].bidfloor
    - name: imp_type
      expr:
        func: cond
        args:
          - expr: {func: has, args: [{path: imp[*].video.id}]}
          - literal: "video"
          - literal: "display"
tc, _ := ba.NewTranscoderFromConfigFile("denorm.yaml", memory.DefaultAllocator)
Full-fidelity proto → Arrow
tc, _ := ba.New(md, memory.DefaultAllocator)
defer tc.Release()
tc.Append(msg)
rec := tc.NewRecordBatch()
defer rec.Release()
_ = tc.Schema()  // *arrow.Schema
_ = tc.Parquet() // *schema.Schema
Multi-goroutine pipeline
// Clone is ~2× cheaper than New. Shares the immutable plan + HyperType.
for i := 0; i < numWorkers; i++ {
    clone, _ := tc.Clone(memory.NewGoAllocator())
    go func(w *ba.Transcoder) {
        defer w.Release()
        for raw := range ch {
            w.AppendDenormRaw(raw)
        }
        rec := w.NewDenormalizerRecordBatch()
        // send rec downstream
    }(clone)
}

Output modes

Two modes, one Transcoder. Use them individually or together.

Mode Append methods Flush Output
Full-fidelity Append, AppendRaw, AppendWithCustom, AppendRawMerged NewRecordBatch() Nested Arrow schema mirroring the full protobuf structure
Denormalization AppendDenorm, AppendDenormRaw, AppendDenormRawMerged NewDenormalizerRecordBatch() Flat Arrow schema from declared paths; fan-out across repeated fields

Features

Ingestion API
Method Input Speed Notes
Append(msg) proto.Message baseline Full-fidelity
AppendRaw(b) []byte 110–151 k/s Requires HyperType
AppendRawMerged(base, custom) []byte, []byte 106 k/s Field-safe wire merge
AppendDenorm(msg) proto.Message 73–535 k/s Plan-based; fan-out dependent
AppendDenormRaw(b) []byte 121–296 k/s Fastest; + HyperType recommended
AppendDenormRawMerged(base, custom) []byte, []byte 204 k/s Merge + denorm in one pass

Throughputs are single-threaded, realistic BidRequest corpus, i7-13700H. Scale linearly with Clone workers — see Performance.

HyperType — compiled parser + online PGO

HyperType wraps a hyperpb compiled message parser. Without it, AppendRaw* falls back to dynamicpb — consistently 2.9–4.7× slower.

// Create once per message type. Thread-safe. Share across all goroutines.
ht := ba.NewHyperType(md,
    ba.WithAutoRecompile(100_000, 0.01), // recompile every 100K msgs; 1% sampling
)

tc, _ := ba.New(md, mem, ba.WithHyperType(ht), ...)

// Manual recompile when traffic is representative:
ht.Recompile()         // synchronous
ht.RecompileAsync()    // background goroutine; returns <-chan struct{}

Recompilation is atomic — all Transcoders sharing the HyperType pick up the new parser on their next call, no restarts.

Denormalization + fan-out
// 2 items × 3 tags → 6 output rows per message
tc, _ := ba.New(md, mem,
    ba.WithDenormalizerPlan(
        pbpath.PlanPath("order_id"),
        pbpath.PlanPath("items[*].id",    pbpath.Alias("item_id")),   // group A
        pbpath.PlanPath("items[*].price", pbpath.Alias("price")),
        pbpath.PlanPath("tags[*]",        pbpath.Alias("tag")),       // group B (cross-joined)
    ),
)

Path syntax:

Syntax Behaviour
field / a.b.c scalar or nested message path
repeated[*] wildcard fan-out — one row per element
repeated[N] fixed index — scalar, no fan-out
repeated[1:3] Python-style slice — fan-out over elements 1–2
repeated[-2:] negative indices supported
repeated[::2] step-only slice

Columns sharing the same wildcard steps are in the same fan-out group (lockstep). Different groups are cross-joined: totalRows = ∏ groupSizes. Empty groups emit one null row (left-join semantics).

Expression engine

Computed columns, evaluated inline during plan traversal — no extra pass over the data.

pbpath.PlanPath("buyer",
    pbpath.WithExpr(pbpath.FuncCoalesce(
        pbpath.PathRef("user.id"),
        pbpath.PathRef("device.ifa"),
    )),
    pbpath.Alias("buyer_id"),
),

Available expression functions:

Category Functions
Control flow cond, coalesce, default
Predicates has, eq, ne, lt, le, gt, ge, and, or, not
Arithmetic add, sub, mul, div, mod, abs, ceil, floor, round, min, max
String concat, upper, lower, trim, trim_prefix, trim_suffix, len
Cast cast_int, cast_float, cast_string
Timestamp strptime, try_strptime, age, epoch_to_date, extract_yearextract_second, date_part
ETL hash, bucket, mask, coerce, enum_name, sum, distinct, list_concat

Auxiliary YAML fields: sep, literal / literal2, param.

Schema merging

Add fields from a second .proto to the base schema. Fields are renumbered above the base's max field number so raw wire bytes from both messages can be safely concatenated.

tc, _ := ba.New(baseMD, mem, ba.WithCustomMessage(customMD))
tc.AppendRawMerged(baseBytes, customBytes)
Parquet I/O
var buf bytes.Buffer
tc.Append(msg)
tc.WriteParquet(&buf)

rec, _ := tc.ReadParquet(ctx, bytes.NewReader(buf.Bytes()), nil /* all cols */)

At > 100 k msg/s, Parquet I/O becomes the bottleneck. Buffer ≥ 1,000 rows before writing.

Arrow → Protobuf back-decode
msgs := tc.Proto(rec, nil)         // all rows
msgs := tc.Proto(rec, []int{0, 2}) // rows 0 and 2 only
Protobuf Editions & runtime compilation

Works at the protoreflect level — compatible with proto2, proto3, and Edition 2023+.

// No protoc required:
tc, _ := ba.NewFromFile("schema/event.proto", "EventMessage", []string{"./schema"}, mem)

// Or compile the descriptor yourself:
fd, _ := ba.CompileProtoToFileDescriptor("event.proto", []string{"./schema"})
md, _ := ba.GetMessageDescriptorByName(fd, "EventMessage")

Performance

Benchmarks use a 506-message realistic BidRequest corpus (75% 2-imp messages, all fields populated). Raw data: docs/. Reproduce: make bench.

Single-threaded (i7-13700H, realistic corpus)
Method msg/s ns/msg allocs/msg
Hand-written Arrow getters 180 k 5,544 131
AppendDenorm (proto.Message) 73 k 13,662 230
AppendRaw (HyperType) 151 k 6,606 98
AppendDenormRaw (HyperType, realistic) 296 k 3,376 57
AppendDenormRaw (no HyperType) 47 k 21,094 275
AppendRawMerged (HyperType) 106 k 9,418 30
AppendDenormRawMerged (HyperType) 204 k 4,899 66

AppendDenormRaw with a HyperType beats hand-written code by 39% and uses 57% fewer allocations.

Concurrent scaling (AppendRaw, i7-13700H, GOMAXPROCS=20)
Workers msg/s
1 79 k
4 227 k
8 296 k
16 406 k
80 463 k

AppendDenormRaw peaks at workers=16 → 481 k msg/s.

Batch size impact (AppendRaw, BidRequest, single worker)
Batch msg/s allocs/msg
1 6,774 2,446
100 102,577 47
1,000 121,315 26
122,880 129,689 24

Use batch size ≥ 100. Align to 122,880 (DuckDB row group size) for direct Parquet ingest.

Clone vs New

Clone is ~2× cheaper than New. Never create a Transcoder inside a message loop.

Schema New() Clone()
ScalarTypes 57 µs 34 µs
BidRequest + denorm 497 µs 272 µs

Full benchmark tables and Python numbers: docs/benchmark-results.md.


Python Bindings

pybufarrow — zero-copy Go→Python via the Arrow C Data Interface.

pip install pybufarrow
from pybufarrow import HyperType, Transcoder

ht = HyperType("events.proto", "UserEvent")

with Transcoder.from_proto_file("events.proto", "UserEvent", hyper_type=ht) as tc:
    for raw in kafka_consumer:
        tc.append(raw)
    batch = tc.flush()

df = batch.to_pandas()

Denorm, streaming helpers, Pool for multi-worker throughput — see python/README.md.

Use Pool, not ThreadPoolExecutor. At 4 workers, Pool is 16× faster due to GIL behaviour.


pbpath

proto/pbpath — standalone protobuf field-path engine, usable independently.

  • Dot-path parsing with wildcards, Python-style slices, ranges, negative indices
  • Trie-based Plan API: shared-prefix traversal for extracting many fields from the same message
  • Full expression engine (30+ functions)
pbpath-playground

Interactive web UI for testing paths and YAML denorm configs against real data before deployment:

go run ./cmd/pbpath-playground --proto path/to/schema.proto

Opens at localhost:4195. Two modes: Pipeline (live path evaluation against proto messages) and Denorm (live YAML config → Arrow table preview).

Flag Default Description
--proto required .proto file(s), repeatable
--import-path proto import directories, repeatable
--corpus length-prefixed binary file for real-data testing
--port 4195 HTTP port
--seed random protorand seed for reproducible test messages

Well-known type mapping

Protobuf Arrow
bool Boolean
int32 / sint32 / sfixed32 / enum Int32
uint32 / fixed32 Uint32
int64 / sint64 / sfixed64 Int64
uint64 / fixed64 Uint64
float Float32
double Float64
string Utf8
bytes Binary
google.protobuf.Timestamp Timestamp(ms, UTC)
google.protobuf.Duration Duration(ms)
google.protobuf.FieldMask Utf8
google.protobuf.*Value wrappers unwrapped scalar
google.type.Date Date32
google.type.TimeOfDay Time64(µs)
google.type.Money / LatLng / Color / PostalAddress / Interval Utf8 (protojson)
OpenTelemetry AnyValue Binary (proto-marshalled)
repeated field List<T>
map field Map<K,V>
embedded message Struct{...}

Development

Make targets
Target Description
make test Run Go + Python tests
make test-go Go tests only (go test -timeout 180s ./...)
make test-python Python tests (uv-managed venv, pytest)
make bench Run Go + Python benchmarks
make bench-go Go benchmarks; filter with BENCH_FILTER=BenchmarkFoo
make bench-python Python benchmarks (pytest-benchmark, outputs JSON)
make bench-throughput Concurrent max-throughput benchmarks only
make bench-compare Rotate previous results → .old, run, diff with benchstat
make libbufarrow Build C shared library (cbinding/libbufarrow.so)
make python Build pybufarrow wheel (requires libbufarrow)
make python-dev Editable Python install for development
make venv-sync Create/update uv-managed venv in python/

Benchmark variables:

make bench-go BENCH_FILTER=BenchmarkAppendRaw BENCH_TIME=10s BENCH_COUNT=3
make bench-go BENCH_FILTER=BenchmarkMaxThroughput_ConcurrentAppendDenormRaw
Variable Default Description
BENCH_FILTER . (all) -bench regex filter
BENCH_TIME 3s -benchtime per benchmark
BENCH_COUNT 1 -count repetitions
BENCH_OUT docs/<cpu>-benchmark-results.txt Go output file
BENCH_OUT_PYTHON docs/<cpu>-benchmark-results-python.json Python output file

bench-compare automatically detects the CPU model and writes per-machine result files. Run it twice to get a benchstat delta.

Reference

Licence

bufarrowLib is released under the Apache 2.0 license. See LICENCE.txt

Documentation

Overview

Package bufarrowlib converts protobuf messages to Apache Arrow record batches (and back), providing high-throughput ingestion pipelines and optional denormalization for analytics workloads.

Core types

A Transcoder is the central type. It holds the compiled protobuf schema, an Arrow record builder for the full message, and an optional denormalizer that projects selected scalar paths into a flat Arrow record. Construct one with New (from a protoreflect.MessageDescriptor) or NewFromFile (from a .proto source file).

A HyperType wraps a compiled buf.build/go/hyperpb.MessageType and enables online profile-guided optimization (PGO). All Transcoder instances sharing a HyperType contribute profiling data, and a recompile atomically upgrades the parser for all of them.

Concurrency

Transcoder methods are NOT safe for concurrent use. Call Transcoder.Clone to obtain independent copies for parallel goroutines. HyperType IS safe for concurrent use.

Hot-path guidance

Denormalization

The denormalizer (configured via WithDenormalizerPlan) evaluates a set of protobuf field paths—potentially through repeated fields—and writes one flat Arrow record per cross-joined fan-out row. Configure it from a YAML file with NewTranscoderFromConfigFile or ParseDenormConfigFile + NewTranscoderFromConfig.

Example (DenormToParquet)

Example_denormToParquet shows the complete workflow of denormalizing nested protobuf messages into a flat Arrow record and writing to Parquet.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Append an order with 2 items × 2 tags → 4 denormalized rows.
msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.50}, {"B", 2.75}},
	[]string{"rush", "fragile"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

// The denormalized record builder produces a flat schema.
fmt.Printf("schema: %v\n", tc.DenormalizerSchema().Field(0).Name)

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("denorm rows: %d (2 items × 2 tags)\n", rec.NumRows())

// Write the flat denormalized record to Parquet via a new Transcoder
// built from the denorm schema, or write directly using pqarrow.
// Here we show the denorm data for inspection:
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	price := rec.Column(2).(*array.Float64).Value(i)
	tag := rec.Column(3).(*array.String).Value(i)
	fmt.Printf("  %s | %s | %.2f | %s\n", name, id, price, tag)
}
Output:
schema: order_name
denorm rows: 4 (2 items × 2 tags)
  order-1 | A | 1.50 | rush
  order-1 | A | 1.50 | fragile
  order-1 | B | 2.75 | rush
  order-1 | B | 2.75 | fragile
Example (ParquetRoundTrip)

Example_parquetRoundTrip shows a full round-trip: proto → Arrow → Parquet → Arrow → proto. This demonstrates that data survives serialisation intact.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Encode two products.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

// Proto → Arrow → Parquet (in-memory buffer).
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Parquet → Arrow.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

// Arrow → Proto.
msgs := tc.Proto(rec, nil)

fmt.Printf("round-tripped %d messages:\n", len(msgs))
for _, m := range msgs {
	js, _ := protojson.Marshal(m)
	var c bytes.Buffer
	json.Compact(&c, js)
	fmt.Printf("  %s\n", c.String())
}
Output:
round-tripped 2 messages:
  {"name":"Widget","price":9.99,"qty":5}
  {"name":"Gadget","price":24.5,"qty":2}
Example (ParquetSelectColumns)

Example_parquetSelectColumns shows reading only specific columns from a Parquet file — useful for analytics queries that touch a subset of fields.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Read only column 0 (name) and column 2 (qty).
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, []int{0, 2})
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("cols: %d, rows: %d\n", rec.NumCols(), rec.NumRows())
names := rec.Column(0).(*array.String)
qtys := rec.Column(1).(*array.Int32)
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Printf("  %s: %d\n", names.Value(i), qtys.Value(i))
}
Output:
cols: 2, rows: 2
  Widget: 5
  Gadget: 2
Example (ProtoFileToParquet)

Example_protoFileToParquet shows the end-to-end workflow when working with .proto files on disk: compile the schema, create a transcoder, populate messages, and write Parquet.

// Write an inline .proto to a temp file.
_, dir := writeProtoFile("event.proto", `syntax = "proto3";
message Event {
  string id      = 1;
  string action  = 2;
  int64  user_id = 3;
}
`)
defer os.RemoveAll(dir)

// Compile the .proto and look up the message descriptor.
fd, err := bufarrowlib.CompileProtoToFileDescriptor("event.proto", []string{dir})
if err != nil {
	log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Event")
if err != nil {
	log.Fatal(err)
}

// Create a Transcoder from the compiled descriptor.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Build and append messages using dynamicpb.
for _, e := range []struct {
	id, action string
	uid        int64
}{
	{"e1", "click", 100},
	{"e2", "view", 200},
} {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("id"), protoreflect.ValueOfString(e.id))
	msg.Set(md.Fields().ByName("action"), protoreflect.ValueOfString(e.action))
	msg.Set(md.Fields().ByName("user_id"), protoreflect.ValueOfInt64(e.uid))
	tc.Append(msg)
}

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

fmt.Printf("fields: %v\n", tc.FieldNames())
fmt.Printf("wrote parquet: %d bytes\n", buf.Len())
Output:
fields: [id action user_id]
wrote parquet: 688 bytes
Example (ProtoToParquetBatched)

Example_protoToParquetBatched shows writing protobuf messages to Parquet in batches — useful when processing a large stream and flushing periodically.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Batch 1: first two messages.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
batch1 := tc.NewRecordBatch()
defer batch1.Release()

// Batch 2: one more message.
tc.Append(newProduct(md, "Gizmo", 7.25, 12))
batch2 := tc.NewRecordBatch()
defer batch2.Release()

// Write both batches to a single Parquet file.
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, batch1, batch2); err != nil {
	log.Fatal(err)
}

// Read back and verify total row count.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("batches: 2, total rows read back: %d\n", rec.NumRows())
Output:
batches: 2, total rows read back: 3
Example (ProtoToParquetFile)

Example_protoToParquetFile shows the complete workflow of consuming protobuf messages and writing them to a Parquet file on disk.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Simulate consuming a stream of protobuf messages.
products := []struct {
	name  string
	price float64
	qty   int32
}{
	{"Widget", 9.99, 5},
	{"Gadget", 24.50, 2},
	{"Gizmo", 7.25, 12},
}
for _, p := range products {
	tc.Append(newProduct(md, p.name, p.price, p.qty))
}

// Write to a Parquet file.
f, err := os.CreateTemp("", "example-*.parquet")
if err != nil {
	log.Fatal(err)
}
name := f.Name()
defer os.Remove(name)

if err := tc.WriteParquet(f); err != nil {
	log.Fatal(err)
}
f.Close()

info, _ := os.Stat(name)
fmt.Printf("wrote %d messages to parquet (%d bytes)\n", len(products), info.Size())
Output:
wrote 3 messages to parquet (738 bytes)

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrMxDepth is returned when the protobuf message nesting exceeds maxDepth.
	ErrMxDepth = errors.New("max depth reached, either the message is deeply nested or a circular dependency was introduced")
	// ErrPathNotFound is returned by node.getPath when a field name is not
	// found in the node's hash map.
	ErrPathNotFound = errors.New("path not found")
)

Sentinel errors returned during schema construction and path lookup.

Functions

func ColumnsToPlanSpecs added in v0.3.0

func ColumnsToPlanSpecs(columns []ColumnDef) ([]pbpath.PlanPathSpec, error)

ColumnsToPlanSpecs converts a slice of ColumnDef into []pbpath.PlanPathSpec ready for WithDenormalizerPlan.

func CompileProtoToFileDescriptor

func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)

CompileProtoToFileDescriptor compiles a .proto file at runtime using protocompile and returns the resulting FileDescriptor. importPaths are the directories searched for transitive imports.

Example

ExampleCompileProtoToFileDescriptor demonstrates compiling a .proto file from disk at runtime and inspecting its messages.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < fd.Messages().Len(); i++ {
		m := fd.Messages().Get(i)
		fmt.Printf("%s (%d fields)\n", m.Name(), m.Fields().Len())
	}
}
Output:
Item (2 fields)

func ExprKindToAppendFunc added in v0.2.0

func ExprKindToAppendFunc(kind protoreflect.Kind, b array.Builder) protoAppendFunc

ExprKindToAppendFunc returns a closure that appends a protoreflect.Value of the given kind to the Arrow array builder b. The builder must match the type returned by ExprKindToArrowType for the same kind.

This is the Expr-output counterpart of ProtoKindToAppendFunc; it handles only the primitive scalar kinds that Expr functions can produce. Returns nil for unsupported kinds.

func ExprKindToArrowType added in v0.2.0

func ExprKindToArrowType(kind protoreflect.Kind) arrow.DataType

ExprKindToArrowType returns the Arrow data type corresponding to a protoreflect.Kind. This is used for denormalizer columns whose type is determined by an [Expr] function's output kind rather than by a leaf field descriptor.

Only the primitive scalar kinds that Expr functions can produce are handled:

BoolKind   → Boolean
Int32Kind  → Int32     Int64Kind  → Int64
Uint32Kind → Uint32    Uint64Kind → Uint64
FloatKind  → Float32   DoubleKind → Float64
StringKind → String    BytesKind  → Binary
EnumKind   → Int32

Returns nil for message, group, or unrecognised kinds.

func GetMessageDescriptorByName

func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)

GetMessageDescriptorByName looks up a top-level message by name in the given FileDescriptor. Returns an error if the message is not found.

Example

ExampleGetMessageDescriptorByName demonstrates looking up a specific message by name from a compiled FileDescriptor.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Item")
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < md.Fields().Len(); i++ {
		f := md.Fields().Get(i)
		fmt.Printf("%s %s\n", f.Name(), f.Kind())
	}
}
Output:
name string
price double

func MergeMessageDescriptors

func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)

MergeMessageDescriptors merges two message descriptors into a new one with the specified name. It appends the fields from the second descriptor to the first, avoiding name conflicts. Field numbers from b are auto-renumbered starting after a's max field number to prevent wire-format collisions. Nested message types and enum types from b are also carried over. The resulting message descriptor is wrapped in a synthetic file descriptor to ensure it can be used independently.

Example

ExampleMergeMessageDescriptors demonstrates merging two message descriptors into one with combined fields.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	merged, err := bufarrowlib.MergeMessageDescriptors(baseMD, customMD, "Merged")
	if err != nil {
		log.Fatal(err)
	}

	fields := merged.Fields()
	for i := 0; i < fields.Len(); i++ {
		fmt.Println(fields.Get(i).Name())
	}
}
Output:
name
price
qty
region
batch_id

func ProtoKindToAppendFunc

func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc

ProtoKindToAppendFunc returns a closure that appends a protoreflect.Value of the appropriate kind to the given Arrow array builder. The builder must match the Arrow data type returned by ProtoKindToArrowType for the same field descriptor.

Returns nil if the field's kind is not a recognized scalar mapping.

Example

ExampleProtoKindToAppendFunc demonstrates obtaining a typed append closure for a protobuf field and using it to populate an Arrow builder.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	nameFD := md.Fields().ByName("name")

	dt := bufarrowlib.ProtoKindToArrowType(nameFD)
	builder := array.NewBuilder(memory.DefaultAllocator, dt)
	defer builder.Release()

	appendFn := bufarrowlib.ProtoKindToAppendFunc(nameFD, builder)
	appendFn(protoreflect.ValueOfString("hello"))
	appendFn(protoreflect.ValueOfString("world"))

	arr := builder.NewArray()
	defer arr.Release()

	fmt.Printf("len: %d\n", arr.Len())
	fmt.Println(arr.(*array.String).Value(0))
	fmt.Println(arr.(*array.String).Value(1))
}
Output:
len: 2
hello
world

func ProtoKindToArrowType

func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType

ProtoKindToArrowType returns the Arrow data type corresponding to a protobuf field descriptor's scalar kind. In addition to primitive kinds (bool, int32, string, etc.), the following well-known and common message types are recognised and mapped to flat Arrow scalars:

  • google.protobuf.Timestamp → Timestamp(ms, UTC)
  • google.protobuf.Duration → Duration(ms)
  • google.protobuf.FieldMask → String (comma-joined paths)
  • google.protobuf.*Value → unwrapped scalar (BoolValue→Boolean, etc.)
  • google.type.Date → Date32
  • google.type.TimeOfDay → Time64(µs)
  • google.type.Money → String (protojson)
  • google.type.LatLng → String (protojson)
  • google.type.Color → String (protojson)
  • google.type.PostalAddress → String (protojson)
  • google.type.Interval → String (protojson)
  • opentelemetry AnyValue → Binary (proto-marshalled)

Returns nil if the field's kind or message type is not a recognized scalar mapping (e.g. a generic message, map, or group).

TODO: add a WithTimestampUnit(arrow.TimeUnit) option to allow callers to override the default Millisecond precision.

Example

ExampleProtoKindToArrowType demonstrates mapping protobuf field kinds to Arrow data types.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()

	for i := 0; i < md.Fields().Len(); i++ {
		fd := md.Fields().Get(i)
		dt := bufarrowlib.ProtoKindToArrowType(fd)
		fmt.Printf("%-8s → %s\n", fd.Name(), dt)
	}
}
Output:
name     → utf8
price    → float64
qty      → int32

Types

type ArgDef added in v0.3.0

type ArgDef struct {
	Path    string   `yaml:"path,omitempty"`
	Literal any      `yaml:"literal,omitempty"`
	Expr    *ExprDef `yaml:"expr,omitempty"`
}

ArgDef is one argument in an ExprDef. Exactly one field should be set:

  • Path: a protobuf field path (leaf pbpath.PathRef).
  • Literal: a scalar constant (string, int, float64, or bool).
  • Expr: a nested expression sub-tree.

type ColumnDef added in v0.3.0

type ColumnDef struct {
	// Name is the output Arrow column name.
	Name string `yaml:"name"`
	// Path is a pbpath path string used when no Expr is needed.
	Path string `yaml:"path,omitempty"`
	// Strict makes out-of-bounds range/index access return an error instead
	// of being silently clamped. Only meaningful for path-based columns.
	Strict bool `yaml:"strict,omitempty"`
	// Expr defines a computed value from one or more source paths.
	Expr *ExprDef `yaml:"expr,omitempty"`
}

ColumnDef defines one output Arrow column in the denormalizer.

Exactly one of Path or Expr must be set:

  • Path: a raw protobuf path string (e.g. "imp[*].bidfloor"). The column name in the output schema is taken from Name.
  • Expr: a computed expression tree. All source paths come from the expression's leaf ArgDef entries; the Name field becomes the output column alias.

Strict only applies to path-based columns; it is ignored for expr columns.

type CustomMsgConfig added in v0.3.0

type CustomMsgConfig struct {
	File        string   `yaml:"file"`
	Message     string   `yaml:"message"`
	ImportPaths []string `yaml:"import_paths,omitempty"`
}

CustomMsgConfig optionally merges fields from a second .proto message into the base schema, enabling Transcoder.AppendWithCustom.

type DenormConfig added in v0.3.0

type DenormConfig struct {
	Proto         ProtoConfig      `yaml:"proto"`
	CustomMessage *CustomMsgConfig `yaml:"custom_message,omitempty"`
	Denormalizer  DenormPlanConfig `yaml:"denormalizer"`
}

DenormConfig is the top-level declarative configuration for building a Transcoder with a denormalizer plan from a YAML source.

Example YAML:

proto:
  file: path/to/schema.proto
  message: BidRequestEvent
  import_paths:
    - ./proto

# optional: merge an additional message's fields into the schema
custom_message:
  file: path/to/extensions.proto
  message: BidRequestExtension

denormalizer:
  columns:
    - name: auction_id
      path: auction_id

    - name: imp_id
      path: imp[*].id

    - name: floor_price
      path: imp[*].bidfloor
      strict: true

    - name: has_video
      expr:
        func: has
        args:
          - path: imp[*].video.id

    - name: full_imp_id
      expr:
        func: concat
        sep: "-"
        args:
          - path: imp[*].id
          - path: imp[*].banner.id

    - name: region
      expr:
        func: default
        args:
          - path: user.geo.region
        literal: "unknown"

func ParseDenormConfig added in v0.3.0

func ParseDenormConfig(r io.Reader) (*DenormConfig, error)

ParseDenormConfig decodes a YAML DenormConfig from r. Unknown fields are rejected to surface typos early.

Example

ExampleParseDenormConfig demonstrates parsing a YAML denormalizer configuration from an strings.Reader and inspecting the result.

package main

import (
	"fmt"
	"log"
	"strings"

	"github.com/loicalleyne/bufarrowlib"
)

func main() {
	src := `
proto:
  file: schema.proto
  message: Order
denormalizer:
  columns:
    - name: order_name
      path: name
    - name: item_id
      path: items[*].id
    - name: item_price
      path: items[*].price
    - name: has_tags
      expr:
        func: has
        args:
          - path: tags[*]
`
	cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("proto:", cfg.Proto.Message)
	fmt.Println("columns:", len(cfg.Denormalizer.Columns))
	for _, col := range cfg.Denormalizer.Columns {
		if col.Expr != nil {
			fmt.Printf("  %-18s expr:%s\n", col.Name, col.Expr.Func)
		} else {
			fmt.Printf("  %-18s path:%s\n", col.Name, col.Path)
		}
	}
}
Output:
proto: Order
columns: 4
  order_name         path:name
  item_id            path:items[*].id
  item_price         path:items[*].price
  has_tags           expr:has
Example (NestedExpr)

ExampleParseDenormConfig_nestedExpr shows a cond expression whose predicate is itself a nested expression (recursive bufarrowlib.ExprDef trees).

package main

import (
	"fmt"
	"log"
	"strings"

	"github.com/loicalleyne/bufarrowlib"
)

func main() {
	src := `
proto:
  file: schema.proto
  message: Order
denormalizer:
  columns:
    - name: category
      expr:
        func: cond
        args:
          - expr:
              func: gt
              args:
                - path: seq
                - literal: 100
          - literal: "premium"
          - literal: "standard"
`
	cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
	if err != nil {
		log.Fatal(err)
	}

	col := cfg.Denormalizer.Columns[0]
	fmt.Println("func:", col.Expr.Func)
	fmt.Println("predicate:", col.Expr.Args[0].Expr.Func)
	fmt.Println("then:", col.Expr.Args[1].Literal)
	fmt.Println("else:", col.Expr.Args[2].Literal)
}
Output:
func: cond
predicate: gt
then: premium
else: standard

func ParseDenormConfigFile added in v0.3.0

func ParseDenormConfigFile(path string) (*DenormConfig, error)

ParseDenormConfigFile reads and parses a YAML DenormConfig from a file.

type DenormPlanConfig added in v0.3.0

type DenormPlanConfig struct {
	Columns []ColumnDef `yaml:"columns"`
}

DenormPlanConfig holds the ordered list of output columns for the denormalizer plan.

type ExprDef added in v0.3.0

type ExprDef struct {
	Func     string   `yaml:"func"`
	Args     []ArgDef `yaml:"args,omitempty"`
	Sep      string   `yaml:"sep,omitempty"`
	Literal  any      `yaml:"literal,omitempty"`
	Literal2 any      `yaml:"literal2,omitempty"`
	Param    int      `yaml:"param,omitempty"`
}

ExprDef describes a single node in a composable expression tree.

Supported func names

┌──────────────┬──────────────────────────────────────────────────────────────────────┐
│ Category     │ func values                                                          │
├──────────────┼──────────────────────────────────────────────────────────────────────┤
│ Aggregation  │ coalesce, default                                                    │
│ Control flow │ cond                                                                 │
│ Predicates   │ has, eq, ne, lt, le, gt, ge                                          │
│ Arithmetic   │ add, sub, mul, div, mod, abs, ceil, floor, round, min, max           │
│ String       │ concat, upper, lower, trim, trim_prefix, trim_suffix, len            │
│ Cast         │ cast_int, cast_float, cast_string                                    │
│ Timestamp    │ age, strptime, try_strptime, extract_year, extract_month,            │
│              │ extract_day, extract_hour, extract_minute, extract_second            │
│ ETL          │ hash, epoch_to_date, date_part, bucket, mask, coerce, enum_name,     │
│              │ sum, distinct, list_concat                                           │
│ Logic        │ and, or, not                                                         │
└──────────────┴──────────────────────────────────────────────────────────────────────┘

Auxiliary fields

  • Args — ordered list of child ArgDef values.
  • Sep — string parameter, interpretation depends on func:
  • concat: separator string (e.g. "-", ",").
  • trim_prefix, trim_suffix: the affix string to remove.
  • strptime, try_strptime: the Go time-format string (e.g. "2006-01-02T15:04:05Z").
  • date_part: part name ("year", "month", "day", "hour", "minute", "second", "epoch").
  • list_concat: separator between collected values.
  • mask: the replacement character (single rune, e.g. "*").
  • Literal — first scalar constant:
  • default: the fallback value when the child is null/zero.
  • coerce: the ifTrue replacement value.
  • YAML type determines Go type: string → string, integer → int64, float → float64, bool → bool.
  • Literal2 — second scalar constant:
  • coerce: the ifFalse replacement value (same type rules as Literal).
  • Param — integer parameter:
  • bucket: bucket size (the child value is divided into buckets of this width).
  • mask: number of leading characters to keep unmasked (keepFirst); the number of trailing characters to keep is always 0 (use a cond+mask tree for both).

type HyperType added in v0.2.0

type HyperType struct {
	// contains filtered or unexported fields
}

HyperType is a shared coordinator for a compiled hyperpb.MessageType that supports lock-free sharing across multiple Transcoder instances (including clones running in separate goroutines). It provides online profile-guided optimization (PGO) via hyperpb.Profile: all Transcoders using the same HyperType contribute profiling data, and a recompile atomically upgrades the parser for all of them.

A HyperType is safe for concurrent use. The underlying hyperpb.MessageType is immutable after compilation; HyperType.Recompile atomically swaps it.

func NewHyperType added in v0.2.0

func NewHyperType(md protoreflect.MessageDescriptor, opts ...HyperTypeOption) *HyperType

NewHyperType compiles a hyperpb.MessageType from md and returns a shared coordinator ready for use by one or more Transcoder instances. The compiled type can later be recompiled with profiling data for improved parse performance.

Example

ExampleNewHyperType demonstrates creating a HyperType coordinator for high-performance raw-bytes ingestion. HyperType compiles a hyperpb parser from a message descriptor and can be shared across multiple Transcoders.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func main() {
	outerMD, _ := buildHyperExampleDescriptors()

	// Create a HyperType — this compiles the hyperpb parser once.
	ht := bufarrowlib.NewHyperType(outerMD)

	// The compiled type is accessible and can be inspected.
	fmt.Printf("type: %v\n", ht.Type() != nil)
	fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output:
type: true
sample rate: 0.01
Example (WithAutoRecompile)

ExampleNewHyperType_withAutoRecompile demonstrates enabling automatic profile-guided recompilation. After the threshold number of messages, the parser is recompiled using collected profiling data.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func main() {
	outerMD, _ := buildHyperExampleDescriptors()

	// Recompile every 10,000 messages, sampling 5% of them for profiling.
	ht := bufarrowlib.NewHyperType(outerMD,
		bufarrowlib.WithAutoRecompile(10_000, 0.05),
	)

	fmt.Printf("type: %v\n", ht.Type() != nil)
	fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output:
type: true
sample rate: 0.05

func (*HyperType) Profile added in v0.2.0

func (h *HyperType) Profile() *hyperpb.Profile

Profile returns the current hyperpb.Profile for recording parse statistics. Returns nil if profiling has not been initialized.

func (*HyperType) Recompile added in v0.2.0

func (h *HyperType) Recompile() error

Recompile recompiles the underlying hyperpb.MessageType using the collected profiling data. It atomically swaps the old profile for a fresh one (preventing double-recompile races), recompiles, and stores the new type. All Transcoder instances sharing this HyperType will pick up the new type on their next Transcoder.AppendRaw or Transcoder.AppendDenormRaw call.

Recompile is safe for concurrent use but is intentionally synchronous: the caller blocks until compilation finishes. For non-blocking recompile, wrap the call in a goroutine.

Returns an error if no profile data has been collected.

Example

ExampleHyperType_Recompile demonstrates manual profile-guided recompilation. After processing a batch of messages, the profiling data is used to recompile the parser for better performance on subsequent batches.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// Manual recompile mode: threshold=0 disables auto-recompile,
	// rate=1.0 profiles 100% of messages for maximum accuracy.
	ht := bufarrowlib.NewHyperType(outerMD,
		bufarrowlib.WithAutoRecompile(0, 1.0),
	)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Phase 1: Profile a representative batch.
	for i := 0; i < 100; i++ {
		msg := newOuterMessage(outerMD, innerMD,
			fmt.Sprintf("product-%d", i), "",
			[]struct {
				id    string
				price float64
			}{
				{fmt.Sprintf("item-%d", i), float64(i) + 0.99},
			}, int64(i),
		)
		raw, _ := proto.Marshal(msg)
		tc.AppendDenormRaw(raw)
	}
	rec := tc.NewDenormalizerRecordBatch()
	rec.Release()

	// Phase 2: Recompile with the collected profile.
	if err := ht.Recompile(); err != nil {
		log.Fatal(err)
	}
	fmt.Println("recompiled successfully")

	// Phase 3: Process more data with the optimised parser.
	msg := newOuterMessage(outerMD, innerMD, "final", "", []struct {
		id    string
		price float64
	}{{"Z", 99.99}}, 1)
	raw, _ := proto.Marshal(msg)
	tc.AppendDenormRaw(raw)

	rec2 := tc.NewDenormalizerRecordBatch()
	defer rec2.Release()

	fmt.Printf("rows after recompile: %d\n", rec2.NumRows())
}
Output:
recompiled successfully
rows after recompile: 1

func (*HyperType) RecompileAsync added in v0.2.0

func (h *HyperType) RecompileAsync() <-chan struct{}

RecompileAsync spawns a goroutine to recompile asynchronously. The returned channel is closed when recompilation completes (or is skipped because another recompile is in progress). Errors are silently discarded; use [Recompile] directly if error handling is needed.

func (*HyperType) RecordMessage added in v0.2.0

func (h *HyperType) RecordMessage() bool

RecordMessage increments the message counter and returns true if the auto-recompile threshold has been reached and the caller should trigger HyperType.Recompile. Returns false if auto-recompile is disabled (threshold == 0) or the threshold has not been reached.

func (*HyperType) SampleRate added in v0.2.0

func (h *HyperType) SampleRate() float64

SampleRate returns the profiling sample rate.

func (*HyperType) Type added in v0.2.0

func (h *HyperType) Type() *hyperpb.MessageType

Type returns the current compiled hyperpb.MessageType. The returned pointer is safe to use until the next HyperType.Recompile call; callers should load it once per batch rather than caching it long-term.

type HyperTypeOption added in v0.2.0

type HyperTypeOption func(*hyperTypeConfig)

HyperTypeOption configures a HyperType during construction.

func WithAutoRecompile added in v0.2.0

func WithAutoRecompile(threshold int64, rate float64) HyperTypeOption

WithAutoRecompile enables automatic recompilation after threshold messages have been profiled. rate is the sampling fraction passed to hyperpb.WithRecordProfile (e.g. 0.01 for 1%). A threshold of 0 disables auto-recompile (the default); use HyperType.Recompile manually instead.

type Opt

type Opt struct {
	// contains filtered or unexported fields
}

Opt holds the option values collected from Option functions and passed to New or NewFromFile. Fields are unexported; use the With* helpers.

type Option

type Option func(config)

Option is a functional option applied to New and NewFromFile to configure schema merging, denormalization, or other behaviours.

func WithCustomMessage

func WithCustomMessage(msgDesc protoreflect.MessageDescriptor) Option

WithCustomMessage provides a protoreflect.MessageDescriptor whose fields will be merged into the base message schema. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessageFile.

Example

ExampleWithCustomMessage demonstrates the WithCustomMessage option.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty region batch_id]

func WithCustomMessageFile

func WithCustomMessageFile(protoFilePath, messageName string, importPaths []string) Option

WithCustomMessageFile specifies a .proto file and message name whose fields will be merged into the base message schema. The .proto file is compiled at schema creation time using protocompile. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessage.

Example

ExampleWithCustomMessageFile demonstrates augmenting a base schema with custom fields loaded from a .proto file on disk.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

const exampleCustomProto = `syntax = "proto3";
message Extra {
  string region = 1;
}
`

func main() {
	basePath, baseDir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(baseDir)

	customPath, customDir := writeProtoFile("extra.proto", exampleCustomProto)
	defer os.RemoveAll(customDir)

	tc, err := bufarrowlib.NewFromFile(
		filepath.Base(basePath), "Item", []string{baseDir},
		memory.DefaultAllocator,
		bufarrowlib.WithCustomMessageFile(filepath.Base(customPath), "Extra", []string{customDir}),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price region]

func WithDenormalizerPlan

func WithDenormalizerPlan(paths ...pbpath.PlanPathSpec) Option

WithDenormalizerPlan configures one or more protobuf field paths to project into a flat (denormalized) Arrow record. Each path is specified as a pbpath.PlanPathSpec created via pbpath.PlanPath, which supports per-path options such as pbpath.Alias and pbpath.StrictPath.

Paths that traverse repeated fields with a wildcard [*] or range [start:end] produce fan-out rows. Multiple independent fan-out groups are cross-joined; empty fan-out groups produce a single null row (left-join semantics).

Each leaf path must terminate at a scalar protobuf field or a recognized well-known message type (google.protobuf.Timestamp, otel AnyValue). Message-typed terminal nodes are rejected at schema creation time.

Example

ExampleWithDenormalizerPlan demonstrates configuring a denormalization plan.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"X", 1.0}, {"Y", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	fmt.Printf("  %s | %s\n", name, id)
}
Output:
rows: 2
  order-1 | X
  order-1 | Y

func WithHyperType added in v0.2.0

func WithHyperType(ht *HyperType) Option

WithHyperType provides a shared HyperType coordinator for PGO-enabled raw-bytes ingestion via Transcoder.AppendRaw and Transcoder.AppendDenormRaw. The HyperType's compiled hyperpb.MessageType is used instead of compiling a new one, and all Transcoders sharing the same HyperType contribute profiling data for online recompilation.

Create a HyperType with NewHyperType and pass it to multiple New/Clone calls. Call HyperType.Recompile to upgrade the parser with collected profile data.

Example

ExampleWithHyperType demonstrates the WithHyperType option, which connects a Transcoder to a shared HyperType coordinator for raw-bytes ingestion.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	ht := bufarrowlib.NewHyperType(outerMD)

	// WithHyperType enables AppendRaw and AppendDenormRaw on the transcoder.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOuterMessage(outerMD, innerMD, "Test", "", nil, 42)
	raw, _ := proto.Marshal(msg)

	// AppendRaw is now available because WithHyperType was provided.
	if err := tc.AppendRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 1

type ProtoConfig added in v0.3.0

type ProtoConfig struct {
	// File is the path to the .proto file.
	File string `yaml:"file"`
	// Message is the top-level message name to use as the Transcoder root.
	Message string `yaml:"message"`
	// ImportPaths are additional directories searched when resolving imports
	// within the .proto file.
	ImportPaths []string `yaml:"import_paths,omitempty"`
}

ProtoConfig identifies the .proto source file and the root message type.

type Transcoder

type Transcoder struct {
	// contains filtered or unexported fields
}

Transcoder converts protobuf messages to Apache Arrow record batches and back. It holds the compiled message schema, an Arrow record builder for the full message ("stencil"), and optionally a separate denormalizer that projects selected paths into a flat Arrow record suitable for analytics.

A Transcoder is not safe for concurrent use; call Transcoder.Clone to create independent copies for parallel goroutines.

func New

func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)

New returns a new Transcoder from a pre-resolved message descriptor. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile. WithDenormalizerPlan creates a separate flat Arrow record for analytics whilst WithCustomMessage/WithCustomMessageFile expand the schema of the proto.MessageDescriptor used as input.

Example

ExampleNew demonstrates creating a Transcoder, appending protobuf messages, and retrieving the result as an Arrow record batch.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
rows: 2, cols: 3
fields: [name price qty]
Example (WithCustomMessage)

ExampleNew_withCustomMessage demonstrates augmenting a protobuf schema with custom fields using WithCustomMessage, and populating both the base and custom fields via AppendWithCustom.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Printf("fields: %v\n", tc.FieldNames())

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("US-EAST"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(42))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
fields: [name price qty region batch_id]
rows: 1, cols: 5

func NewFromFile

func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, opts ...Option) (*Transcoder, error)

NewFromFile returns a new Transcoder by compiling a .proto file at runtime. protoFilePath is the path to the .proto file, messageName is the top-level message to use, and importPaths are the directories to search for imports. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile.

Example

ExampleNewFromFile demonstrates creating a Transcoder directly from a .proto file on disk, without pre-compiling the descriptor yourself.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	path, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	tc, err := bufarrowlib.NewFromFile(filepath.Base(path), "Item", []string{dir}, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Populate a message using the schema's descriptor.
	md := tc.Schema().Field(0) // just verify schema was built
	fmt.Printf("field 0: %s\n", md.Name)
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
field 0: name
fields: [name price]

func NewTranscoderFromConfig added in v0.3.0

func NewTranscoderFromConfig(cfg *DenormConfig, mem memory.Allocator) (*Transcoder, error)

NewTranscoderFromConfig builds a Transcoder from a parsed DenormConfig. mem is the Arrow memory allocator to use; pass nil for the default allocator.

Example

ExampleNewTranscoderFromConfig demonstrates building a bufarrowlib.Transcoder from a bufarrowlib.DenormConfig and inspecting the resulting denormalizer schema. It uses the same Order/Item schema as in [ExampleTranscoder_AppendDenorm].

// writeProtoFile (defined in example_transcoder_test.go) creates a
// temporary .proto file and returns its path and the temp directory.
protoFile, dir := writeProtoFile("example.proto", `
syntax = "proto3";
package example;
message Item {
  string id    = 1;
  double price = 2;
}
message Order {
  string          name  = 1;
  repeated Item   items = 2;
  repeated string tags  = 3;
  int64           seq   = 4;
}
`)
defer os.RemoveAll(dir)

src := `
proto:
  file: ` + protoFile + `
  message: Order
denormalizer:
  columns:
    - name: order_name
      path: name
    - name: item_id
      path: items[*].id
    - name: item_price
      path: items[*].price
    - name: has_tags
      expr:
        func: has
        args:
          - path: tags[*]
`
cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
if err != nil {
	log.Fatal(err)
}

tc, err := bufarrowlib.NewTranscoderFromConfig(cfg, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Inspect the compiled denormalizer schema.
schema := tc.DenormalizerSchema()
fmt.Printf("columns: %d\n", schema.NumFields())
for i := 0; i < schema.NumFields(); i++ {
	f := schema.Field(i)
	fmt.Printf("  %-15s %s\n", f.Name, f.Type)
}
Output:
columns: 4
  order_name      utf8
  item_id         utf8
  item_price      float64
  has_tags        bool

func NewTranscoderFromConfigFile added in v0.3.0

func NewTranscoderFromConfigFile(configPath string, mem memory.Allocator) (*Transcoder, error)

NewTranscoderFromConfigFile reads a YAML file and calls NewTranscoderFromConfig.

func (*Transcoder) Append

func (s *Transcoder) Append(value proto.Message)

Append appends a protobuf message to the transcoder's Arrow record builder. This method is not safe for concurrent use.

Example

ExampleTranscoder_Append shows the basic append-and-flush cycle.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Alpha", 1.0, 10))
	tc.Append(newProduct(md, "Bravo", 2.0, 20))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 2

func (*Transcoder) AppendDenorm

func (s *Transcoder) AppendDenorm(msg proto.Message) error

AppendDenorm evaluates the denormalizer plan against msg and appends the resulting denormalized rows to the denormalizer's Arrow record builder.

Fan-out groups are cross-joined: each group contributes max(len(branches), 1) rows, and the total row count is the product of all group counts. Empty fan-out groups (no branches) contribute 1 null row (left-join semantics).

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendDenorm demonstrates basic denormalization: projecting scalar and fan-out fields from a protobuf message into a flat Arrow record.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.50}, {"B", 2.75}},
		nil, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	for i := 0; i < int(rec.NumRows()); i++ {
		name := rec.Column(0).(*array.String).Value(i)
		id := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %s | %.2f\n", name, id, price)
	}
}
Output:
rows: 2, cols: 3
  order-1 | A | 1.50
  order-1 | B | 2.75
Example (Coalesce)

ExampleTranscoder_AppendDenorm_coalesce demonstrates FuncCoalesce in a denormalization plan. Coalesce returns the first non-zero value from multiple paths — useful when a field can come from different sources.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Coalesce: use customer name, fall back to email if customer is empty.
			pbpath.PlanPath("buyer",
				pbpath.WithExpr(pbpath.FuncCoalesce(
					pbpath.PathRef("customer"),
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("buyer"),
			),
			pbpath.PlanPath("region", pbpath.Alias("region")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Message 1: customer is set → "Alice" is used.
	msg1 := newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "US", nil, 0)
	tc.AppendDenorm(msg1)

	// Message 2: customer is empty → email is used as fallback.
	msg2 := newSale(saleMD, lineItemMD, "", "bob@example.com", "EU", nil, 0)
	tc.AppendDenorm(msg2)

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		buyer := rec.Column(0).(*array.String).Value(i)
		region := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  buyer=%-18s region=%s\n", buyer, region)
	}
}
Output:
  buyer=Alice              region=US
  buyer=bob@example.com    region=EU
Example (ComposedExpr)

ExampleTranscoder_AppendDenorm_composedExpr demonstrates composing multiple Expr functions together. This example creates a "display label" column that combines customer name and uppercase region, with a fallback for missing customer names.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Composed expression:
			//   Concat(": ", Coalesce(customer, email), Upper(region))
			// This reads as: join the first non-empty identifier with the
			// uppercased region, separated by ": ".
			pbpath.PlanPath("display",
				pbpath.WithExpr(pbpath.FuncConcat(": ",
					pbpath.FuncCoalesce(
						pbpath.PathRef("customer"),
						pbpath.PathRef("email"),
					),
					pbpath.FuncUpper(
						pbpath.PathRef("region"),
					),
				)),
				pbpath.Alias("display"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@ex.com", "us", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "", "bob@ex.com", "eu", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		fmt.Println(rec.Column(0).(*array.String).Value(i))
	}
}
Output:
Alice: US
bob@ex.com: EU
Example (Concat)

ExampleTranscoder_AppendDenorm_concat demonstrates FuncConcat, which joins the string representations of multiple path values with a separator.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Concat: join customer and region with " / ".
			pbpath.PlanPath("label",
				pbpath.WithExpr(pbpath.FuncConcat(" / ",
					pbpath.PathRef("customer"),
					pbpath.PathRef("region"),
				)),
				pbpath.Alias("label"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "EU", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		fmt.Println(rec.Column(0).(*array.String).Value(i))
	}
}
Output:
Alice / US
Bob / EU
Example (CrossJoin)

ExampleTranscoder_AppendDenorm_crossJoin demonstrates cross-join behaviour when two independent repeated fields are denormalized together.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.0}, {"B", 2.0}},
		[]string{"x", "y", "z"}, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d (2 items × 3 tags)\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		id := rec.Column(0).(*array.String).Value(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  %s | %s\n", id, tag)
	}
}
Output:
rows: 6 (2 items × 3 tags)
  A | x
  A | y
  A | z
  B | x
  B | y
  B | z
Example (Default)

ExampleTranscoder_AppendDenorm_default demonstrates FuncDefault, which provides a literal fallback value when a field is zero/empty.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("customer", pbpath.Alias("customer")),
			// Default: if region is empty, use "UNKNOWN".
			pbpath.PlanPath("region",
				pbpath.WithExpr(pbpath.FuncDefault(
					pbpath.PathRef("region"),
					pbpath.ScalarString("UNKNOWN"),
				)),
				pbpath.Alias("region"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no region

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		customer := rec.Column(0).(*array.String).Value(i)
		region := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  %s: %s\n", customer, region)
	}
}
Output:
  Alice: US
  Bob: UNKNOWN
Example (Has)

ExampleTranscoder_AppendDenorm_has demonstrates FuncHas, which returns a boolean indicating whether a field has a non-zero value. This is useful for creating presence flag columns.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("customer", pbpath.Alias("customer")),
			// Has: does the email field have a non-empty value?
			pbpath.PlanPath("has_email",
				pbpath.WithExpr(pbpath.FuncHas(
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("has_email"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no email

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		customer := rec.Column(0).(*array.String).Value(i)
		hasEmail := rec.Column(1).(*array.Boolean).Value(i)
		fmt.Printf("  %s: has_email=%v\n", customer, hasEmail)
	}
}
Output:
  Alice: has_email=true
  Bob: has_email=false
Example (LeftJoin)

ExampleTranscoder_AppendDenorm_leftJoin demonstrates left-join semantics when a repeated field is empty: a single null row is produced for that group while the other group fans out normally.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Zero items, 2 tags → items group produces 1 null row → 1 × 2 = 2 rows
	msg := newOrder(orderMD, itemMD, "order-1", nil, []string{"x", "y"}, 1)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		idNull := rec.Column(0).IsNull(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  item_id=null:%v | tag=%s\n", idNull, tag)
	}
}
Output:
rows: 2
  item_id=null:true | tag=x
  item_id=null:true | tag=y
Example (Upper)

ExampleTranscoder_AppendDenorm_upper demonstrates FuncUpper, which converts a string field to upper case — useful for normalizing data during ingestion.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("region_upper",
				pbpath.WithExpr(pbpath.FuncUpper(
					pbpath.PathRef("region"),
				)),
				pbpath.Alias("region_upper"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "us-east", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Println(rec.Column(0).(*array.String).Value(0))
}
Output:
US-EAST
Example (WithExprAndFanout)

ExampleTranscoder_AppendDenorm_withExprAndFanout demonstrates combining Expr-based computed columns with fan-out (wildcard) paths. The expression columns broadcast as scalars while the repeated field fans out.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Computed column: coalesce customer/email (scalar, broadcasts)
			pbpath.PlanPath("buyer",
				pbpath.WithExpr(pbpath.FuncCoalesce(
					pbpath.PathRef("customer"),
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("buyer"),
			),
			// Fan-out: one row per line item
			pbpath.PlanPath("items[*].sku", pbpath.Alias("sku")),
			pbpath.PlanPath("items[*].unit_price", pbpath.Alias("price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newSale(saleMD, lineItemMD, "Alice", "", "US", []struct {
		sku   string
		price float64
		qty   int64
	}{
		{"WIDGET-001", 9.99, 2},
		{"GADGET-002", 24.50, 1},
	}, 0)
	tc.AppendDenorm(msg)

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		buyer := rec.Column(0).(*array.String).Value(i)
		sku := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %-12s | %.2f\n", buyer, sku, price)
	}
}
Output:
rows: 2
  Alice | WIDGET-001   | 9.99
  Alice | GADGET-002   | 24.50

func (*Transcoder) AppendDenormRaw added in v0.2.0

func (s *Transcoder) AppendDenormRaw(data []byte) error

AppendDenormRaw unmarshals raw protobuf bytes and appends the denormalized result to the transcoder's denormalizer Arrow record builder.

When a HyperType is configured (via WithHyperType), it uses the compiled parser with hyperpb.Shared for memory reuse and optional PGO profiling. Otherwise, it falls back to proto.Unmarshal with a dynamicpb stencil.

A denormalizer plan (via WithDenormalizerPlan) must be configured.

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendDenormRaw demonstrates high-performance raw-bytes denormalization. This combines hyperpb decoding with the Plan-based denormalizer to go directly from raw protobuf bytes to a flat Arrow record.

This is the fastest path for streaming protobuf data into analytics-ready flat tables.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// 1. Create a shared HyperType.
	ht := bufarrowlib.NewHyperType(outerMD)

	// 2. Create a Transcoder with HyperType + denormalization plan.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// 3. Marshal messages to raw bytes (simulating Kafka/gRPC input).
	msg := newOuterMessage(outerMD, innerMD, "Gadgets", "", []struct {
		id    string
		price float64
	}{{"X", 1.50}, {"Y", 2.75}}, 10)
	raw, err := proto.Marshal(msg)
	if err != nil {
		log.Fatal(err)
	}

	// 4. Feed raw bytes directly into the denormalizer.
	if err := tc.AppendDenormRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		name := rec.Column(0).(*array.String).Value(i)
		id := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %s | %.2f\n", name, id, price)
	}
}
Output:
rows: 2
  Gadgets | X | 1.50
  Gadgets | Y | 2.75
Example (Batch)

ExampleTranscoder_AppendDenormRaw_batch demonstrates a typical batch processing pattern: feed many raw messages, then flush to a single Arrow record batch.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()
	ht := bufarrowlib.NewHyperType(outerMD)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Simulate a batch of raw messages from a Kafka consumer.
	messages := []struct {
		name  string
		items []struct {
			id    string
			price float64
		}
	}{
		{"Order-1", []struct {
			id    string
			price float64
		}{{"A", 1.0}, {"B", 2.0}}},
		{"Order-2", []struct {
			id    string
			price float64
		}{{"C", 3.0}}},
		{"Order-3", []struct {
			id    string
			price float64
		}{{"D", 4.0}, {"E", 5.0}, {"F", 6.0}}},
	}

	for _, m := range messages {
		msg := newOuterMessage(outerMD, innerMD, m.name, "", m.items, 0)
		raw, _ := proto.Marshal(msg)
		if err := tc.AppendDenormRaw(raw); err != nil {
			log.Fatal(err)
		}
	}

	// Flush all accumulated rows into one Arrow record batch.
	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("messages: %d, denorm rows: %d\n", len(messages), rec.NumRows())
}
Output:
messages: 3, denorm rows: 6

func (*Transcoder) AppendDenormRawMerged added in v0.3.0

func (s *Transcoder) AppendDenormRawMerged(baseBytes, customBytes []byte) error

AppendDenormRawMerged concatenates base and custom serialized protobuf byte slices and appends the denormalized result to the transcoder's denormalizer Arrow record builder.

This follows the same byte-concatenation strategy as [AppendRawMerged] but routes the result through the denormalization engine.

Requires both a custom message (via WithCustomMessage or WithCustomMessageFile) and a denormalizer plan (via WithDenormalizerPlan).

This method is not safe for concurrent use.

func (*Transcoder) AppendRaw added in v0.2.0

func (s *Transcoder) AppendRaw(data []byte) error

AppendRaw unmarshals raw protobuf bytes using the HyperType's compiled parser and appends the result to the transcoder's Arrow record builder.

This method requires a HyperType configured via WithHyperType. It uses hyperpb.Shared for memory reuse and optionally records profiling data for online PGO. When the auto-recompile threshold is reached, the parser is recompiled inline.

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendRaw demonstrates high-performance raw-bytes ingestion using AppendRaw. This accepts raw protobuf wire bytes (e.g. from Kafka, gRPC, or a file) and decodes them using hyperpb's compiled parser — 2–3× faster than proto.Unmarshal with generated code.

AppendRaw populates the full Arrow record (like Append), while AppendDenormRaw populates the denormalized flat record (like AppendDenorm).

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// 1. Create a shared HyperType (compile the parser once).
	ht := bufarrowlib.NewHyperType(outerMD)

	// 2. Create a Transcoder with HyperType.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// 3. Marshal a message to raw bytes (simulating receiving from Kafka).
	msg := newOuterMessage(outerMD, innerMD, "Widget", "", []struct {
		id    string
		price float64
	}{{"A", 9.99}}, 5)
	raw, err := proto.Marshal(msg)
	if err != nil {
		log.Fatal(err)
	}

	// 4. Feed raw bytes — no proto.Unmarshal needed.
	if err := tc.AppendRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 4

func (*Transcoder) AppendRawMerged added in v0.3.0

func (s *Transcoder) AppendRawMerged(baseBytes, customBytes []byte) error

AppendRawMerged concatenates base and custom serialized protobuf byte slices and appends the merged result to the transcoder's Arrow record builder.

This works because MergeMessageDescriptors renumbers all custom fields above the base message's maximum field number, so the two wire-format byte slices have strictly disjoint field tags. Protobuf wire-format concatenation (base || custom) produces a valid merged message.

When a HyperType is configured (fast path), the concatenated bytes are parsed via hyperpb.Unmarshal. Otherwise (fallback), proto.Unmarshal into a clone of [Transcoder.stencilCustom] is used.

Returns an error if no custom message was configured via WithCustomMessage or WithCustomMessageFile.

This method is not safe for concurrent use.

func (*Transcoder) AppendWithCustom

func (s *Transcoder) AppendWithCustom(value proto.Message, custom proto.Message) error

AppendWithCustom appends a protobuf value merged with custom field values to the transcoder's builder. The custom proto.Message must conform to the message descriptor provided via WithCustomMessage or WithCustomMessageFile. Both messages are marshalled to bytes and unmarshalled into the merged stencil for appending. This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendWithCustom shows appending a message that includes both base and custom fields.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("EU"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(7))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 5

func (*Transcoder) Clone

func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)

Clone returns an identical Transcoder. Use in concurrency scenarios as Transcoder methods are not concurrency safe.

The compiled denormalizer [Plan] is shared (it is immutable), but the Arrow builders, scratch buffers, and leaf scratch are freshly allocated so each clone can operate independently.

Example

ExampleTranscoder_Clone demonstrates cloning a Transcoder for use in a separate goroutine. The clone has independent builders but shares the same schema configuration.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	clone, err := tc.Clone(memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer clone.Release()

	clone.Append(newProduct(md, "Gizmo", 5.00, 10))

	rec := clone.NewRecordBatch()
	defer rec.Release()
	fmt.Printf("clone rows: %d\n", rec.NumRows())

	origRec := tc.NewRecordBatch()
	defer origRec.Release()
	fmt.Printf("original rows: %d\n", origRec.NumRows())
}
Output:
clone rows: 1
original rows: 0
Example (WithHyperType)

ExampleTranscoder_Clone_withHyperType demonstrates cloning a Transcoder that uses HyperType. The clone shares the same HyperType (so profiling data is aggregated) but has independent Arrow builders. This is the recommended pattern for multi-goroutine pipelines.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// Shared HyperType — both transcoders contribute profiling data.
	ht := bufarrowlib.NewHyperType(outerMD)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Clone for a second goroutine — shares HyperType + immutable Plan,
	// fresh Arrow builders and scratch buffers.
	clone, err := tc.Clone(memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer clone.Release()

	// Feed different data to each transcoder.
	msg1 := newOuterMessage(outerMD, innerMD, "Alpha", "", []struct {
		id    string
		price float64
	}{{"A1", 1.0}}, 0)
	raw1, _ := proto.Marshal(msg1)
	tc.AppendDenormRaw(raw1)

	msg2 := newOuterMessage(outerMD, innerMD, "Bravo", "", []struct {
		id    string
		price float64
	}{{"B1", 2.0}, {"B2", 3.0}}, 0)
	raw2, _ := proto.Marshal(msg2)
	clone.AppendDenormRaw(raw2)

	// Each transcoder flushes independently.
	rec1 := tc.NewDenormalizerRecordBatch()
	defer rec1.Release()
	rec2 := clone.NewDenormalizerRecordBatch()
	defer rec2.Release()

	fmt.Printf("original: %d rows\n", rec1.NumRows())
	fmt.Printf("clone:    %d rows\n", rec2.NumRows())
}
Output:
original: 1 rows
clone:    2 rows

func (*Transcoder) DenormalizerBuilder

func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder

DenormalizerBuilder returns the denormalizer's Arrow array.RecordBuilder. This is exposed for callers who need to implement custom denormalization logic beyond what Transcoder.AppendDenorm provides. In most cases prefer AppendDenorm for automatic fan-out and cross-join handling. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerBuilder shows accessing the underlying RecordBuilder for custom denormalization logic.

orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

builder := tc.DenormalizerBuilder()
fmt.Printf("builder fields: %d\n", builder.Schema().NumFields())
fmt.Printf("schema: %s\n", builder.Schema().Field(0).Type)
Output:
builder fields: 1
schema: utf8

func (*Transcoder) DenormalizerSchema

func (s *Transcoder) DenormalizerSchema() *arrow.Schema

DenormalizerSchema returns the Arrow schema of the denormalized record. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerSchema demonstrates inspecting the Arrow schema of the denormalized record, showing column names, types, and nullability.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func main() {
	orderMD, _ := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("seq", pbpath.Alias("order_seq")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	schema := tc.DenormalizerSchema()
	for i, f := range schema.Fields() {
		fmt.Printf("  %d: %-12s %-10s nullable=%v\n", i, f.Name, f.Type, f.Nullable)
	}
}
Output:
  0: order_name   utf8       nullable=true
  1: order_seq    int64      nullable=true
  2: item_price   float64    nullable=true

func (*Transcoder) FieldNames

func (s *Transcoder) FieldNames() []string

FieldNames returns the top-level Arrow field names of the message schema.

Example

ExampleTranscoder_FieldNames shows retrieving top-level field names.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty]

func (*Transcoder) NewDenormalizerRecordBatch

func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch

NewDenormalizerRecordBatch returns the buffered denormalizer builder contents as an arrow.RecordBatch. The builder is reset and can be reused. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_NewDenormalizerRecordBatch shows flushing the denormalizer's builder into a record batch.

orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.0}, {"B", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Println(rec.Column(0).(*array.String).Value(i))
}
Output:
rows: 2
A
B

func (*Transcoder) NewRecordBatch

func (s *Transcoder) NewRecordBatch() arrow.RecordBatch

NewRecordBatch returns the buffered builder contents as an arrow.RecordBatch. The builder is reset and can be reused.

Example

ExampleTranscoder_NewRecordBatch shows building an Arrow record batch from appended messages.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 3

func (*Transcoder) Parquet

func (s *Transcoder) Parquet() *schema.Schema

Parquet returns the Parquet schema.Schema for the message.

Example

ExampleTranscoder_Parquet demonstrates inspecting the Parquet schema.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	pqSchema := tc.Parquet()
	fmt.Printf("parquet columns: %d\n", pqSchema.NumColumns())
	for i := 0; i < pqSchema.NumColumns(); i++ {
		fmt.Printf("  %s\n", pqSchema.Column(i).Name())
	}
}
Output:
parquet columns: 3
  name
  price
  qty

func (*Transcoder) Proto

func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message

Proto decodes selected rows from an Arrow RecordBatch back into protobuf messages. Pass nil for rows to decode all rows.

Example

ExampleTranscoder_Proto demonstrates round-tripping: appending protobuf messages, building an Arrow record, and reconstructing them back as protobuf messages.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	msgs := tc.Proto(rec, nil) // nil = all rows
	fmt.Printf("recovered %d messages\n", len(msgs))
	for _, m := range msgs {
		js, _ := protojson.Marshal(m)
		var c bytes.Buffer
		json.Compact(&c, js)
		fmt.Println(c.String())
	}
}
Output:
recovered 2 messages
{"name":"Widget","price":9.99,"qty":5}
{"name":"Gadget","price":24.5,"qty":2}

func (*Transcoder) ReadParquet

func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)

ReadParquet reads the specified columns from Parquet source r and returns an Arrow RecordBatch. The returned RecordBatch must be released by the caller.

Example

ExampleTranscoder_ReadParquet demonstrates a full Parquet round-trip: write messages to Parquet, then read them back into an Arrow record.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	reader := bytes.NewReader(buf.Bytes())
	rec, err := tc.ReadParquet(context.Background(), reader, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer rec.Release()

	fmt.Printf("read back: %d rows, %d cols\n", rec.NumRows(), rec.NumCols())
}
Output:
read back: 2 rows, 3 cols

func (*Transcoder) Release

func (s *Transcoder) Release()

Release releases the reference on the underlying Arrow record builder.

Example

ExampleTranscoder_Release shows the standard cleanup pattern.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	// Release should always be deferred after construction.
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 1

func (*Transcoder) Schema

func (s *Transcoder) Schema() *arrow.Schema

Schema returns the Arrow arrow.Schema for the message.

Example

ExampleTranscoder_Schema demonstrates inspecting the Arrow schema derived from a protobuf message descriptor.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	for i, f := range tc.Schema().Fields() {
		fmt.Printf("  %d: %-8s %s\n", i, f.Name, f.Type)
	}
}
Output:
  0: name     utf8
  1: price    float64
  2: qty      int32

func (*Transcoder) WriteParquet

func (s *Transcoder) WriteParquet(w io.Writer) error

WriteParquet writes the current record to Parquet format on w.

Example

ExampleTranscoder_WriteParquet demonstrates writing appended messages to Parquet and reading them back.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	fmt.Printf("parquet bytes: %d\n", buf.Len())
	fmt.Println("wrote parquet successfully")
}
Output:
parquet bytes: 714
wrote parquet successfully

func (*Transcoder) WriteParquetRecords

func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error

WriteParquetRecords writes one or many Arrow records to Parquet on w.

Example

ExampleTranscoder_WriteParquetRecords demonstrates writing multiple Arrow record batches to a single Parquet file.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec1 := tc.NewRecordBatch()
	defer rec1.Release()

	tc.Append(newProduct(md, "Gadget", 24.50, 2))
	rec2 := tc.NewRecordBatch()
	defer rec2.Release()

	var buf bytes.Buffer
	if err := tc.WriteParquetRecords(&buf, rec1, rec2); err != nil {
		log.Fatal(err)
	}

	fmt.Println("wrote 2 record batches to parquet")
}
Output:
wrote 2 record batches to parquet

Directories

Path Synopsis
Package main provides C shared-library exports for bufarrowlib.
Package main provides C shared-library exports for bufarrowlib.
cmd
pbpath-playground command
pbpq is a local web playground for testing pbpath Pipeline queries against protobuf messages.
pbpq is a local web playground for testing pbpath Pipeline queries against protobuf messages.
gen
proto
pbpath
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL