bufarrowlib

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

README

bufarrowLib 🦬

Go Reference

Go library to build Apache Arrow records from Protocol Buffers — with high-performance raw-bytes ingestion, automatic denormalization, Parquet I/O, and an embedded expression engine.

Features

Schema generation
  • Derive Arrow and Parquet schemas automatically from any protobuf message descriptor, including deeply-nested and recursive messages.
  • Construct a Transcoder from compiled Go types (protoreflect.MessageDescriptor) or from .proto files at runtime via NewFromFile.
  • Merge extra "custom" fields into the base schema with WithCustomMessage / WithCustomMessageFile — useful for enriching data with sidecar metadata.
Full-fidelity transcoding
  • Proto → ArrowAppend / AppendWithCustom populate an Arrow RecordBuilder that mirrors the full protobuf structure (structs, lists, maps, oneofs).
  • Arrow → ProtoProto reconstructs typed protobuf messages from Arrow record batches.
  • Proto → Parquet → ArrowWriteParquet / WriteParquetRecords write Arrow records to Parquet; ReadParquet reads them back.
High-performance raw-bytes ingestion

AppendRaw and AppendDenormRaw accept raw protobuf wire bytes and decode them using hyperpb — a TDP-based parser that is 2–3× faster than generated code. Combined with hyperpb.Shared arena reuse and optional profile-guided optimization (PGO), this is the fastest path for streaming protobuf data into Arrow.

Method What it does When to use
AppendRaw([]byte) Unmarshal → full Arrow record You have raw bytes from Kafka/gRPC and want the full protobuf structure in Arrow
AppendDenormRaw([]byte) Unmarshal → flat denormalized record You have raw bytes and want a flat analytics table (selected columns, fan-out)

Both methods require a HyperType coordinator (see below).

HyperType — shared compiled parser with online PGO

HyperType wraps a compiled hyperpb.MessageType in a thread-safe coordinator that can be shared across multiple Transcoder instances (including clones in separate goroutines). It supports optional online profile-guided optimization (PGO): all transcoders contribute profiling data, and a recompile atomically upgrades the parser for all of them.

// Create once, share across goroutines.
ht := bufarrowlib.NewHyperType(md,
    bufarrowlib.WithAutoRecompile(10_000, 0.01), // recompile every 10K messages, 1% sampling
)

// Each goroutine gets its own Transcoder, sharing the same HyperType.
tc, _ := bufarrowlib.New(md, mem, bufarrowlib.WithHyperType(ht), /* ... */)
clone, _ := tc.Clone(mem)

// Feed raw bytes — profiling happens automatically.
tc.AppendDenormRaw(rawBytes)

// Manual recompile (if not using auto-recompile):
ht.Recompile()      // synchronous — blocks until done
ht.RecompileAsync()  // non-blocking — returns a channel

Key HyperType concepts:

Concept Description
NewHyperType(md) Compile a hyperpb.MessageType from a message descriptor
WithAutoRecompile(threshold, rate) Automatically recompile after threshold messages; rate is the profiling sample fraction (e.g. 0.01 = 1%)
ht.Type() Load the current compiled parser (atomic, lock-free)
ht.Recompile() Recompile using collected profile data (CAS-guarded — safe to call concurrently)
ht.RecompileAsync() Non-blocking recompile in a background goroutine
Well-known & common type support

The denormalizer automatically maps protobuf well-known types and common types to flat Arrow scalars — no manual conversion needed.

Protobuf type Arrow type
google.protobuf.Timestamp Timestamp(ms, UTC)
google.protobuf.Duration Duration(ms)
google.protobuf.FieldMask String (comma-joined paths)
google.protobuf.*Value wrappers unwrapped scalar (BoolValueBoolean, Int64ValueInt64, etc.)
google.type.Date Date32
google.type.TimeOfDay Time64(µs)
google.type.Money String (protojson)
google.type.LatLng String (protojson)
google.type.Color String (protojson)
google.type.PostalAddress String (protojson)
google.type.Interval String (protojson)
OpenTelemetry AnyValue Binary (proto-marshalled)
Denormalization

Use WithDenormalizerPlan to project selected protobuf field paths into a flat Arrow record — like SQL SELECT … FROM msg CROSS JOIN UNNEST(msg.items).

  • Paths are specified with the pbpath path language: "items[*].id", "tags[*]", "imp[0:3].banner.w", "name", etc.
  • Column aliases via pbpath.Alias("col_name").
  • Independent repeated-field fan-outs are cross-joined; empty groups emit a single null row (left-join semantics).
  • Fixed-index paths (items[0].id) broadcast as scalars; ranges and wildcards fan out.
  • Computed columns via the Expr engine: FuncCoalesce, FuncCond, FuncConcat, arithmetic, string ops, timestamp parsing, and more.
  • All type mapping and append closures are compiled once at construction time for minimal per-message overhead.
tc, _ := bufarrowlib.New(md, mem,
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("name",            pbpath.Alias("order_name")),
        pbpath.PlanPath("items[*].id",     pbpath.Alias("item_id")),
        pbpath.PlanPath("items[*].price",  pbpath.Alias("item_price")),
        pbpath.PlanPath("tags[*]",         pbpath.Alias("tag")),
    ),
)
tc.AppendDenorm(msg) // 2 items × 3 tags → 6 rows
rec := tc.NewDenormalizerRecordBatch()
Expression engine

The denormalizer's Plan API supports computed columns through a composable Expr tree. Expressions reference protobuf field paths via PathRef and apply functions to produce derived values — all evaluated inline during plan traversal with zero extra passes over the data.

// Coalesce: first non-zero value from multiple paths
pbpath.PlanPath("device_id",
    pbpath.WithExpr(pbpath.FuncCoalesce(
        pbpath.PathRef("user.id"),
        pbpath.PathRef("site.id"),
        pbpath.PathRef("device.ifa"),
    )),
    pbpath.Alias("device_id"),
)

// Conditional: banner dimensions if present, else video
pbpath.PlanPath("width",
    pbpath.WithExpr(pbpath.FuncCond(
        pbpath.FuncHas(pbpath.PathRef("imp[0].banner.w")),
        pbpath.PathRef("imp[0].banner.w"),
        pbpath.PathRef("imp[0].video.w"),
    )),
    pbpath.Alias("width"),
)

Available expression functions:

Category Functions
Control flow FuncCoalesce, FuncDefault, FuncCond
Predicates FuncHas, FuncLen, FuncEq, FuncNe, FuncLt, FuncLe, FuncGt, FuncGe
Arithmetic FuncAdd, FuncSub, FuncMul, FuncDiv, FuncMod
Math FuncAbs, FuncCeil, FuncFloor, FuncRound, FuncMin, FuncMax
String FuncUpper, FuncLower, FuncTrim, FuncTrimPrefix, FuncTrimSuffix, FuncConcat
Cast FuncCastInt, FuncCastFloat, FuncCastString
Timestamp FuncStrptime, FuncTryStrptime, FuncAge, FuncExtractYear/Month/Day/Hour/Minute/Second
ETL FuncHash, FuncEpochToDate, FuncDatePart, FuncBucket, FuncMask, FuncCoerce, FuncEnumName
Aggregates FuncSum, FuncDistinct, FuncListConcat
Cloning

Transcoder.Clone creates an independent copy (separate builders and stencils) that can be used in another goroutine. All options — including denormalization plans and HyperType references — carry over. The immutable Plan is shared; only the mutable Arrow builders and scratch buffers are freshly allocated.

// Original transcoder
tc, _ := bufarrowlib.New(md, mem, bufarrowlib.WithHyperType(ht), /* ... */)

// Clone for another goroutine — shares HyperType + Plan, fresh builders
clone, _ := tc.Clone(mem)

go func() {
    defer clone.Release()
    for raw := range ch {
        clone.AppendDenormRaw(raw)
    }
    rec := clone.NewDenormalizerRecordBatch()
    // ... process rec
}()
Protobuf Editions support

bufarrowlib works at the protoreflect descriptor level, so it is inherently compatible with Protobuf Editions (Edition 2023+) as well as proto2 and proto3. Editions features such as features.field_presence are resolved by the protobuf runtime into the same descriptor properties (HasPresence, Kind, Cardinality, etc.) that bufarrowlib already uses — no special configuration is needed. The CompileProtoToFileDescriptor / NewFromFile path uses protocompile, which supports Edition 2023.

pbpath

The proto/pbpath subpackage is a standalone protobuf field-path engine that can be used independently of the rest of bufarrowlib.

  • Parse a dot-delimited path string against a message descriptor, with support for list wildcards ([*]), Python-style slices ([1:3], [-2:], [::2]), ranges, and negative indices.
  • Evaluate a compiled path against a live proto.Message to extract values, including fan-out across repeated fields.
  • Plan API — compile multiple paths into a trie-based execution plan that traverses shared prefixes only once, ideal for hot-path extraction of many fields from the same message type.
  • Expr engine — composable expression trees for computed columns: Coalesce, Cond, arithmetic, string ops, timestamp parsing, and 30+ built-in functions.

See the full pbpath README, Architecture Guide, and pkg.go.dev reference for details.

🚀 Install

go get -u github.com/loicalleyne/bufarrowlib@latest

💡 Usage

import "github.com/loicalleyne/bufarrowlib"
Quick start — full record
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
tc.Append(msg)
rec := tc.NewRecordBatch()
defer rec.Release()

// Arrow schema & Parquet schema are available:
_ = tc.Schema()   // *arrow.Schema
_ = tc.Parquet()  // *schema.Schema
Quick start — denormalized record
tc, err := bufarrowlib.New(md, memory.DefaultAllocator,
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("items[*].id",    pbpath.Alias("item_id")),
        pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
    ),
)
tc.AppendDenorm(msg)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
Quick start — high-performance raw-bytes ingestion
// 1. Create a shared HyperType (once per message type, thread-safe).
ht := bufarrowlib.NewHyperType(md)

// 2. Create a Transcoder with HyperType + denormalization plan.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator,
    bufarrowlib.WithHyperType(ht),
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("id",           pbpath.Alias("request_id")),
        pbpath.PlanPath("imp[*].id",    pbpath.Alias("imp_id")),
        pbpath.PlanPath("device.geo.country", pbpath.Alias("country")),
    ),
)

// 3. Feed raw protobuf bytes (e.g. from Kafka).
for _, raw := range messages {
    tc.AppendDenormRaw(raw)
}

// 4. Flush to Arrow.
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
Quick start — Parquet round-trip
// Write
var buf bytes.Buffer
tc.Append(msg)
tc.WriteParquet(&buf)

// Read
rec, err := tc.ReadParquet(ctx, bytes.NewReader(buf.Bytes()), nil)
Quick start — .proto file at runtime
tc, err := bufarrowlib.NewFromFile(
    "path/to/schema.proto", "MyMessage",
    []string{"path/to/imports"},
    memory.DefaultAllocator,
)
Quick start — computed columns with expressions
tc, err := bufarrowlib.New(md, memory.DefaultAllocator,
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("id", pbpath.Alias("request_id")),

        // Coalesce: first non-empty ID
        pbpath.PlanPath("device_id",
            pbpath.WithExpr(pbpath.FuncCoalesce(
                pbpath.PathRef("user.id"),
                pbpath.PathRef("device.ifa"),
            )),
            pbpath.Alias("device_id"),
        ),

        // Conditional: banner width or video width
        pbpath.PlanPath("width",
            pbpath.WithExpr(pbpath.FuncCoalesce(
                pbpath.PathRef("imp[0].banner.w"),
                pbpath.PathRef("imp[0].video.w"),
            )),
            pbpath.Alias("width"),
        ),

        // Fan-out across deals
        pbpath.PlanPath("imp[0].pmp.deals[*].id", pbpath.Alias("deal_id")),
    ),
)

📊 Benchmarks

Benchmarks use a realistic 506-message corpus shaped to match sampled production ad-tech traffic (OpenRTB BidRequest): 75% have 2 imp objects, 61% have 1 deal object per impression, banner and video mutually exclusive dimensions, all top-level fields populated.

BidRequest denormalization — method comparison
Method ns/msg msg/s B/msg allocs/msg Description
Custom (hand-written getters) 5,544 180K 5,562 131.2 Manual Arrow builders with typed getters
AppendDenorm (proto.Message) 8,921 112K 7,022 184.6 Plan-based denorm from deserialized proto
AppendDenormRaw (random data) 4,939 202K 3,951 62.4 hyperpb + Shared arena, random corpus
AppendDenormRaw (realistic data) 3,376 296K 2,476 56.9 hyperpb + Shared arena, production-shaped corpus
AppendDenormRaw + PGO 7,046 142K 2,508 66.7 With profile-guided recompilation

AppendDenormRaw with realistic data is 39% faster than hand-written code — with 57% fewer allocations per message — while being fully declarative (no manual Arrow builders to maintain).

Other benchmarks
Benchmark Time Note
New/BidRequest ~220 µs Schema construction from descriptor
NewFromFile/BidRequest ~5.5 ms Compile .proto + construct schema
Append/BidRequest (100 msgs) ~2.8 ms Full-fidelity proto → Arrow
Clone/BidRequest_with_denorm ~110 µs Clone transcoder + denorm plan
WriteParquet/BidRequest (100 rows) ~1.5 ms Arrow → Parquet
ReadParquet/BidRequest (100 rows) ~760 µs Parquet → Arrow
AppendDenorm/2x2 (100 msgs) ~1.5 ms Denorm with 2 items × 2 tags
AppendDenorm/10x10 (100 msgs) ~16 ms Denorm with 10 items × 10 tags
Running benchmarks
# Run all benchmarks
go test -bench=. -benchmem -count=3

# Run only the BidRequest comparison benchmarks
go test -bench='BenchmarkAppendBidRequest' -benchmem -count=3

# Run with longer duration for more stable results
go test -bench='BenchmarkAppendBidRequest' -benchmem -benchtime=5s -count=5

# Run a specific sub-benchmark
go test -bench='BenchmarkAppendBidRequest_HyperpbRaw/Realistic' -benchmem

# Profile CPU
go test -bench='BenchmarkAppendBidRequest_HyperpbRaw/Realistic' -cpuprofile=cpu.prof
go tool pprof cpu.prof

# Profile memory allocations
go test -bench='BenchmarkAppendBidRequest_HyperpbRaw/Realistic' -memprofile=mem.prof
go tool pprof -alloc_objects mem.prof

Flags reference:

Flag Default Description
-bench=<regex> (none) Run benchmarks matching the regex
-benchmem off Report allocations (B/op, allocs/op)
-benchtime=<d> 1s Target wall-clock time per benchmark; Go auto-adjusts b.N
-count=<n> 1 Repeat each benchmark n times for statistical comparison
-cpuprofile=<file> (none) Write CPU profile to file
-memprofile=<file> (none) Write memory profile to file

Choosing the right ingestion method

Scenario Method Why
You have proto.Message objects (from generated code or gRPC) Append / AppendDenorm No marshal/unmarshal overhead — pass the message directly
You have raw []byte from Kafka, a file, or a network stream AppendRaw / AppendDenormRaw Avoids double-unmarshal; hyperpb parser is faster than proto.Unmarshal
You need the full protobuf structure as nested Arrow Append / AppendRaw Produces a hierarchical Arrow record matching the proto schema
You need a flat analytics table with selected columns AppendDenorm / AppendDenormRaw Declarative path selection, automatic fan-out, computed columns via Exprs
Multi-goroutine pipeline Clone with shared HyperType Each goroutine gets independent builders; profiling data is aggregated
Evolving traffic shape WithAutoRecompile PGO adapts the parser as field-presence patterns change

💫 Show your support

Give a ⭐️ if this project helped you! Feedback and PRs welcome.

Licence

bufarrowLib is released under the Apache 2.0 license. See LICENCE.txt

Documentation

Overview

Example (DenormToParquet)

Example_denormToParquet shows the complete workflow of denormalizing nested protobuf messages into a flat Arrow record and writing to Parquet.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Append an order with 2 items × 2 tags → 4 denormalized rows.
msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.50}, {"B", 2.75}},
	[]string{"rush", "fragile"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

// The denormalized record builder produces a flat schema.
fmt.Printf("schema: %v\n", tc.DenormalizerSchema().Field(0).Name)

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("denorm rows: %d (2 items × 2 tags)\n", rec.NumRows())

// Write the flat denormalized record to Parquet via a new Transcoder
// built from the denorm schema, or write directly using pqarrow.
// Here we show the denorm data for inspection:
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	price := rec.Column(2).(*array.Float64).Value(i)
	tag := rec.Column(3).(*array.String).Value(i)
	fmt.Printf("  %s | %s | %.2f | %s\n", name, id, price, tag)
}
Output:
schema: order_name
denorm rows: 4 (2 items × 2 tags)
  order-1 | A | 1.50 | rush
  order-1 | A | 1.50 | fragile
  order-1 | B | 2.75 | rush
  order-1 | B | 2.75 | fragile
Example (ParquetRoundTrip)

Example_parquetRoundTrip shows a full round-trip: proto → Arrow → Parquet → Arrow → proto. This demonstrates that data survives serialisation intact.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Encode two products.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

// Proto → Arrow → Parquet (in-memory buffer).
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Parquet → Arrow.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

// Arrow → Proto.
msgs := tc.Proto(rec, nil)

fmt.Printf("round-tripped %d messages:\n", len(msgs))
for _, m := range msgs {
	js, _ := protojson.Marshal(m)
	var c bytes.Buffer
	json.Compact(&c, js)
	fmt.Printf("  %s\n", c.String())
}
Output:
round-tripped 2 messages:
  {"name":"Widget","price":9.99,"qty":5}
  {"name":"Gadget","price":24.5,"qty":2}
Example (ParquetSelectColumns)

Example_parquetSelectColumns shows reading only specific columns from a Parquet file — useful for analytics queries that touch a subset of fields.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Read only column 0 (name) and column 2 (qty).
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, []int{0, 2})
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("cols: %d, rows: %d\n", rec.NumCols(), rec.NumRows())
names := rec.Column(0).(*array.String)
qtys := rec.Column(1).(*array.Int32)
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Printf("  %s: %d\n", names.Value(i), qtys.Value(i))
}
Output:
cols: 2, rows: 2
  Widget: 5
  Gadget: 2
Example (ProtoFileToParquet)

Example_protoFileToParquet shows the end-to-end workflow when working with .proto files on disk: compile the schema, create a transcoder, populate messages, and write Parquet.

// Write an inline .proto to a temp file.
_, dir := writeProtoFile("event.proto", `syntax = "proto3";
message Event {
  string id      = 1;
  string action  = 2;
  int64  user_id = 3;
}
`)
defer os.RemoveAll(dir)

// Compile the .proto and look up the message descriptor.
fd, err := bufarrowlib.CompileProtoToFileDescriptor("event.proto", []string{dir})
if err != nil {
	log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Event")
if err != nil {
	log.Fatal(err)
}

// Create a Transcoder from the compiled descriptor.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Build and append messages using dynamicpb.
for _, e := range []struct {
	id, action string
	uid        int64
}{
	{"e1", "click", 100},
	{"e2", "view", 200},
} {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("id"), protoreflect.ValueOfString(e.id))
	msg.Set(md.Fields().ByName("action"), protoreflect.ValueOfString(e.action))
	msg.Set(md.Fields().ByName("user_id"), protoreflect.ValueOfInt64(e.uid))
	tc.Append(msg)
}

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

fmt.Printf("fields: %v\n", tc.FieldNames())
fmt.Printf("wrote parquet: %d bytes\n", buf.Len())
Output:
fields: [id action user_id]
wrote parquet: 688 bytes
Example (ProtoToParquetBatched)

Example_protoToParquetBatched shows writing protobuf messages to Parquet in batches — useful when processing a large stream and flushing periodically.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Batch 1: first two messages.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
batch1 := tc.NewRecordBatch()
defer batch1.Release()

// Batch 2: one more message.
tc.Append(newProduct(md, "Gizmo", 7.25, 12))
batch2 := tc.NewRecordBatch()
defer batch2.Release()

// Write both batches to a single Parquet file.
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, batch1, batch2); err != nil {
	log.Fatal(err)
}

// Read back and verify total row count.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("batches: 2, total rows read back: %d\n", rec.NumRows())
Output:
batches: 2, total rows read back: 3
Example (ProtoToParquetFile)

Example_protoToParquetFile shows the complete workflow of consuming protobuf messages and writing them to a Parquet file on disk.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Simulate consuming a stream of protobuf messages.
products := []struct {
	name  string
	price float64
	qty   int32
}{
	{"Widget", 9.99, 5},
	{"Gadget", 24.50, 2},
	{"Gizmo", 7.25, 12},
}
for _, p := range products {
	tc.Append(newProduct(md, p.name, p.price, p.qty))
}

// Write to a Parquet file.
f, err := os.CreateTemp("", "example-*.parquet")
if err != nil {
	log.Fatal(err)
}
name := f.Name()
defer os.Remove(name)

if err := tc.WriteParquet(f); err != nil {
	log.Fatal(err)
}
f.Close()

info, _ := os.Stat(name)
fmt.Printf("wrote %d messages to parquet (%d bytes)\n", len(products), info.Size())
Output:
wrote 3 messages to parquet (738 bytes)

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrMxDepth is returned when the protobuf message nesting exceeds maxDepth.
	ErrMxDepth = errors.New("max depth reached, either the message is deeply nested or a circular dependency was introduced")
	// ErrPathNotFound is returned by node.getPath when a field name is not
	// found in the node's hash map.
	ErrPathNotFound = errors.New("path not found")
)

Sentinel errors returned during schema construction and path lookup.

Functions

func CompileProtoToFileDescriptor

func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)

CompileProtoToFileDescriptor compiles a .proto file at runtime using protocompile and returns the resulting FileDescriptor. importPaths are the directories searched for transitive imports.

Example

ExampleCompileProtoToFileDescriptor demonstrates compiling a .proto file from disk at runtime and inspecting its messages.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < fd.Messages().Len(); i++ {
		m := fd.Messages().Get(i)
		fmt.Printf("%s (%d fields)\n", m.Name(), m.Fields().Len())
	}
}
Output:
Item (2 fields)

func ExprKindToAppendFunc added in v0.2.0

func ExprKindToAppendFunc(kind protoreflect.Kind, b array.Builder) protoAppendFunc

ExprKindToAppendFunc returns a closure that appends a protoreflect.Value of the given kind to the Arrow array builder b. The builder must match the type returned by ExprKindToArrowType for the same kind.

This is the Expr-output counterpart of ProtoKindToAppendFunc; it handles only the primitive scalar kinds that Expr functions can produce. Returns nil for unsupported kinds.

func ExprKindToArrowType added in v0.2.0

func ExprKindToArrowType(kind protoreflect.Kind) arrow.DataType

ExprKindToArrowType returns the Arrow data type corresponding to a protoreflect.Kind. This is used for denormalizer columns whose type is determined by an [Expr] function's output kind rather than by a leaf field descriptor.

Only the primitive scalar kinds that Expr functions can produce are handled:

BoolKind   → Boolean
Int32Kind  → Int32     Int64Kind  → Int64
Uint32Kind → Uint32    Uint64Kind → Uint64
FloatKind  → Float32   DoubleKind → Float64
StringKind → String    BytesKind  → Binary
EnumKind   → Int32

Returns nil for message, group, or unrecognised kinds.

func GetMessageDescriptorByName

func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)

GetMessageDescriptorByName looks up a top-level message by name in the given FileDescriptor. Returns an error if the message is not found.

Example

ExampleGetMessageDescriptorByName demonstrates looking up a specific message by name from a compiled FileDescriptor.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Item")
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < md.Fields().Len(); i++ {
		f := md.Fields().Get(i)
		fmt.Printf("%s %s\n", f.Name(), f.Kind())
	}
}
Output:
name string
price double

func MergeMessageDescriptors

func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)

MergeMessageDescriptors merges two message descriptors into a new one with the specified name. It appends the fields from the second descriptor to the first, avoiding name conflicts. Field numbers from b are auto-renumbered starting after a's max field number to prevent wire-format collisions. Nested message types and enum types from b are also carried over. The resulting message descriptor is wrapped in a synthetic file descriptor to ensure it can be used independently.

Example

ExampleMergeMessageDescriptors demonstrates merging two message descriptors into one with combined fields.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	merged, err := bufarrowlib.MergeMessageDescriptors(baseMD, customMD, "Merged")
	if err != nil {
		log.Fatal(err)
	}

	fields := merged.Fields()
	for i := 0; i < fields.Len(); i++ {
		fmt.Println(fields.Get(i).Name())
	}
}
Output:
name
price
qty
region
batch_id

func ProtoKindToAppendFunc

func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc

ProtoKindToAppendFunc returns a closure that appends a protoreflect.Value of the appropriate kind to the given Arrow array builder. The builder must match the Arrow data type returned by ProtoKindToArrowType for the same field descriptor.

Returns nil if the field's kind is not a recognized scalar mapping.

Example

ExampleProtoKindToAppendFunc demonstrates obtaining a typed append closure for a protobuf field and using it to populate an Arrow builder.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	nameFD := md.Fields().ByName("name")

	dt := bufarrowlib.ProtoKindToArrowType(nameFD)
	builder := array.NewBuilder(memory.DefaultAllocator, dt)
	defer builder.Release()

	appendFn := bufarrowlib.ProtoKindToAppendFunc(nameFD, builder)
	appendFn(protoreflect.ValueOfString("hello"))
	appendFn(protoreflect.ValueOfString("world"))

	arr := builder.NewArray()
	defer arr.Release()

	fmt.Printf("len: %d\n", arr.Len())
	fmt.Println(arr.(*array.String).Value(0))
	fmt.Println(arr.(*array.String).Value(1))
}
Output:
len: 2
hello
world

func ProtoKindToArrowType

func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType

ProtoKindToArrowType returns the Arrow data type corresponding to a protobuf field descriptor's scalar kind. In addition to primitive kinds (bool, int32, string, etc.), the following well-known and common message types are recognised and mapped to flat Arrow scalars:

  • google.protobuf.Timestamp → Timestamp(ms, UTC)
  • google.protobuf.Duration → Duration(ms)
  • google.protobuf.FieldMask → String (comma-joined paths)
  • google.protobuf.*Value → unwrapped scalar (BoolValue→Boolean, etc.)
  • google.type.Date → Date32
  • google.type.TimeOfDay → Time64(µs)
  • google.type.Money → String (protojson)
  • google.type.LatLng → String (protojson)
  • google.type.Color → String (protojson)
  • google.type.PostalAddress → String (protojson)
  • google.type.Interval → String (protojson)
  • opentelemetry AnyValue → Binary (proto-marshalled)

Returns nil if the field's kind or message type is not a recognized scalar mapping (e.g. a generic message, map, or group).

TODO: add a WithTimestampUnit(arrow.TimeUnit) option to allow callers to override the default Millisecond precision.

Example

ExampleProtoKindToArrowType demonstrates mapping protobuf field kinds to Arrow data types.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()

	for i := 0; i < md.Fields().Len(); i++ {
		fd := md.Fields().Get(i)
		dt := bufarrowlib.ProtoKindToArrowType(fd)
		fmt.Printf("%-8s → %s\n", fd.Name(), dt)
	}
}
Output:
name     → utf8
price    → float64
qty      → int32

Types

type HyperType added in v0.2.0

type HyperType struct {
	// contains filtered or unexported fields
}

HyperType is a shared coordinator for a compiled hyperpb.MessageType that supports lock-free sharing across multiple Transcoder instances (including clones running in separate goroutines). It provides online profile-guided optimization (PGO) via hyperpb.Profile: all Transcoders using the same HyperType contribute profiling data, and a recompile atomically upgrades the parser for all of them.

A HyperType is safe for concurrent use. The underlying hyperpb.MessageType is immutable after compilation; HyperType.Recompile atomically swaps it.

func NewHyperType added in v0.2.0

func NewHyperType(md protoreflect.MessageDescriptor, opts ...HyperTypeOption) *HyperType

NewHyperType compiles a hyperpb.MessageType from md and returns a shared coordinator ready for use by one or more Transcoder instances. The compiled type can later be recompiled with profiling data for improved parse performance.

Example

ExampleNewHyperType demonstrates creating a HyperType coordinator for high-performance raw-bytes ingestion. HyperType compiles a hyperpb parser from a message descriptor and can be shared across multiple Transcoders.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func main() {
	outerMD, _ := buildHyperExampleDescriptors()

	// Create a HyperType — this compiles the hyperpb parser once.
	ht := bufarrowlib.NewHyperType(outerMD)

	// The compiled type is accessible and can be inspected.
	fmt.Printf("type: %v\n", ht.Type() != nil)
	fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output:
type: true
sample rate: 0.01
Example (WithAutoRecompile)

ExampleNewHyperType_withAutoRecompile demonstrates enabling automatic profile-guided recompilation. After the threshold number of messages, the parser is recompiled using collected profiling data.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func main() {
	outerMD, _ := buildHyperExampleDescriptors()

	// Recompile every 10,000 messages, sampling 5% of them for profiling.
	ht := bufarrowlib.NewHyperType(outerMD,
		bufarrowlib.WithAutoRecompile(10_000, 0.05),
	)

	fmt.Printf("type: %v\n", ht.Type() != nil)
	fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output:
type: true
sample rate: 0.05

func (*HyperType) Profile added in v0.2.0

func (h *HyperType) Profile() *hyperpb.Profile

Profile returns the current hyperpb.Profile for recording parse statistics. Returns nil if profiling has not been initialized.

func (*HyperType) Recompile added in v0.2.0

func (h *HyperType) Recompile() error

Recompile recompiles the underlying hyperpb.MessageType using the collected profiling data. It atomically swaps the old profile for a fresh one (preventing double-recompile races), recompiles, and stores the new type. All Transcoder instances sharing this HyperType will pick up the new type on their next Transcoder.AppendRaw or Transcoder.AppendDenormRaw call.

Recompile is safe for concurrent use but is intentionally synchronous: the caller blocks until compilation finishes. For non-blocking recompile, wrap the call in a goroutine.

Returns an error if no profile data has been collected.

Example

ExampleHyperType_Recompile demonstrates manual profile-guided recompilation. After processing a batch of messages, the profiling data is used to recompile the parser for better performance on subsequent batches.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// Manual recompile mode: threshold=0 disables auto-recompile,
	// rate=1.0 profiles 100% of messages for maximum accuracy.
	ht := bufarrowlib.NewHyperType(outerMD,
		bufarrowlib.WithAutoRecompile(0, 1.0),
	)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Phase 1: Profile a representative batch.
	for i := 0; i < 100; i++ {
		msg := newOuterMessage(outerMD, innerMD,
			fmt.Sprintf("product-%d", i), "",
			[]struct {
				id    string
				price float64
			}{
				{fmt.Sprintf("item-%d", i), float64(i) + 0.99},
			}, int64(i),
		)
		raw, _ := proto.Marshal(msg)
		tc.AppendDenormRaw(raw)
	}
	rec := tc.NewDenormalizerRecordBatch()
	rec.Release()

	// Phase 2: Recompile with the collected profile.
	if err := ht.Recompile(); err != nil {
		log.Fatal(err)
	}
	fmt.Println("recompiled successfully")

	// Phase 3: Process more data with the optimised parser.
	msg := newOuterMessage(outerMD, innerMD, "final", "", []struct {
		id    string
		price float64
	}{{"Z", 99.99}}, 1)
	raw, _ := proto.Marshal(msg)
	tc.AppendDenormRaw(raw)

	rec2 := tc.NewDenormalizerRecordBatch()
	defer rec2.Release()

	fmt.Printf("rows after recompile: %d\n", rec2.NumRows())
}
Output:
recompiled successfully
rows after recompile: 1

func (*HyperType) RecompileAsync added in v0.2.0

func (h *HyperType) RecompileAsync() <-chan struct{}

RecompileAsync spawns a goroutine to recompile asynchronously. The returned channel is closed when recompilation completes (or is skipped because another recompile is in progress). Errors are silently discarded; use [Recompile] directly if error handling is needed.

func (*HyperType) RecordMessage added in v0.2.0

func (h *HyperType) RecordMessage() bool

RecordMessage increments the message counter and returns true if the auto-recompile threshold has been reached and the caller should trigger HyperType.Recompile. Returns false if auto-recompile is disabled (threshold == 0) or the threshold has not been reached.

func (*HyperType) SampleRate added in v0.2.0

func (h *HyperType) SampleRate() float64

SampleRate returns the profiling sample rate.

func (*HyperType) Type added in v0.2.0

func (h *HyperType) Type() *hyperpb.MessageType

Type returns the current compiled hyperpb.MessageType. The returned pointer is safe to use until the next HyperType.Recompile call; callers should load it once per batch rather than caching it long-term.

type HyperTypeOption added in v0.2.0

type HyperTypeOption func(*hyperTypeConfig)

HyperTypeOption configures a HyperType during construction.

func WithAutoRecompile added in v0.2.0

func WithAutoRecompile(threshold int64, rate float64) HyperTypeOption

WithAutoRecompile enables automatic recompilation after threshold messages have been profiled. rate is the sampling fraction passed to hyperpb.WithRecordProfile (e.g. 0.01 for 1%). A threshold of 0 disables auto-recompile (the default); use HyperType.Recompile manually instead.

type Opt

type Opt struct {
	// contains filtered or unexported fields
}

Opt holds the option values collected from Option functions and passed to New or NewFromFile. Fields are unexported; use the With* helpers.

type Option

type Option func(config)

Option is a functional option applied to New and NewFromFile to configure schema merging, denormalization, or other behaviours.

func WithCustomMessage

func WithCustomMessage(msgDesc protoreflect.MessageDescriptor) Option

WithCustomMessage provides a protoreflect.MessageDescriptor whose fields will be merged into the base message schema. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessageFile.

Example

ExampleWithCustomMessage demonstrates the WithCustomMessage option.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty region batch_id]

func WithCustomMessageFile

func WithCustomMessageFile(protoFilePath, messageName string, importPaths []string) Option

WithCustomMessageFile specifies a .proto file and message name whose fields will be merged into the base message schema. The .proto file is compiled at schema creation time using protocompile. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessage.

Example

ExampleWithCustomMessageFile demonstrates augmenting a base schema with custom fields loaded from a .proto file on disk.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

const exampleCustomProto = `syntax = "proto3";
message Extra {
  string region = 1;
}
`

func main() {
	basePath, baseDir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(baseDir)

	customPath, customDir := writeProtoFile("extra.proto", exampleCustomProto)
	defer os.RemoveAll(customDir)

	tc, err := bufarrowlib.NewFromFile(
		filepath.Base(basePath), "Item", []string{baseDir},
		memory.DefaultAllocator,
		bufarrowlib.WithCustomMessageFile(filepath.Base(customPath), "Extra", []string{customDir}),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price region]

func WithDenormalizerPlan

func WithDenormalizerPlan(paths ...pbpath.PlanPathSpec) Option

WithDenormalizerPlan configures one or more protobuf field paths to project into a flat (denormalized) Arrow record. Each path is specified as a pbpath.PlanPathSpec created via pbpath.PlanPath, which supports per-path options such as pbpath.Alias and pbpath.StrictPath.

Paths that traverse repeated fields with a wildcard [*] or range [start:end] produce fan-out rows. Multiple independent fan-out groups are cross-joined; empty fan-out groups produce a single null row (left-join semantics).

Each leaf path must terminate at a scalar protobuf field or a recognized well-known message type (google.protobuf.Timestamp, otel AnyValue). Message-typed terminal nodes are rejected at schema creation time.

Example

ExampleWithDenormalizerPlan demonstrates configuring a denormalization plan.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"X", 1.0}, {"Y", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	fmt.Printf("  %s | %s\n", name, id)
}
Output:
rows: 2
  order-1 | X
  order-1 | Y

func WithHyperType added in v0.2.0

func WithHyperType(ht *HyperType) Option

WithHyperType provides a shared HyperType coordinator for PGO-enabled raw-bytes ingestion via Transcoder.AppendRaw and Transcoder.AppendDenormRaw. The HyperType's compiled hyperpb.MessageType is used instead of compiling a new one, and all Transcoders sharing the same HyperType contribute profiling data for online recompilation.

Create a HyperType with NewHyperType and pass it to multiple New/Clone calls. Call HyperType.Recompile to upgrade the parser with collected profile data.

Example

ExampleWithHyperType demonstrates the WithHyperType option, which connects a Transcoder to a shared HyperType coordinator for raw-bytes ingestion.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	ht := bufarrowlib.NewHyperType(outerMD)

	// WithHyperType enables AppendRaw and AppendDenormRaw on the transcoder.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOuterMessage(outerMD, innerMD, "Test", "", nil, 42)
	raw, _ := proto.Marshal(msg)

	// AppendRaw is now available because WithHyperType was provided.
	if err := tc.AppendRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 1

type Transcoder

type Transcoder struct {
	// contains filtered or unexported fields
}

Transcoder converts protobuf messages to Apache Arrow record batches and back. It holds the compiled message schema, an Arrow record builder for the full message ("stencil"), and optionally a separate denormalizer that projects selected paths into a flat Arrow record suitable for analytics.

A Transcoder is not safe for concurrent use; call Transcoder.Clone to create independent copies for parallel goroutines.

func New

func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)

New returns a new Transcoder from a pre-resolved message descriptor. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile. WithDenormalizerPlan creates a separate flat Arrow record for analytics whilst WithCustomMessage/WithCustomMessageFile expand the schema of the proto.MessageDescriptor used as input.

Example

ExampleNew demonstrates creating a Transcoder, appending protobuf messages, and retrieving the result as an Arrow record batch.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
rows: 2, cols: 3
fields: [name price qty]
Example (WithCustomMessage)

ExampleNew_withCustomMessage demonstrates augmenting a protobuf schema with custom fields using WithCustomMessage, and populating both the base and custom fields via AppendWithCustom.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Printf("fields: %v\n", tc.FieldNames())

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("US-EAST"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(42))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
fields: [name price qty region batch_id]
rows: 1, cols: 5

func NewFromFile

func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, opts ...Option) (*Transcoder, error)

NewFromFile returns a new Transcoder by compiling a .proto file at runtime. protoFilePath is the path to the .proto file, messageName is the top-level message to use, and importPaths are the directories to search for imports. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile.

Example

ExampleNewFromFile demonstrates creating a Transcoder directly from a .proto file on disk, without pre-compiling the descriptor yourself.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	path, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	tc, err := bufarrowlib.NewFromFile(filepath.Base(path), "Item", []string{dir}, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Populate a message using the schema's descriptor.
	md := tc.Schema().Field(0) // just verify schema was built
	fmt.Printf("field 0: %s\n", md.Name)
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
field 0: name
fields: [name price]

func (*Transcoder) Append

func (s *Transcoder) Append(value proto.Message)

Append appends a protobuf message to the transcoder's Arrow record builder. This method is not safe for concurrent use.

Example

ExampleTranscoder_Append shows the basic append-and-flush cycle.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Alpha", 1.0, 10))
	tc.Append(newProduct(md, "Bravo", 2.0, 20))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 2

func (*Transcoder) AppendDenorm

func (s *Transcoder) AppendDenorm(msg proto.Message) error

AppendDenorm evaluates the denormalizer plan against msg and appends the resulting denormalized rows to the denormalizer's Arrow record builder.

Fan-out groups are cross-joined: each group contributes max(len(branches), 1) rows, and the total row count is the product of all group counts. Empty fan-out groups (no branches) contribute 1 null row (left-join semantics).

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendDenorm demonstrates basic denormalization: projecting scalar and fan-out fields from a protobuf message into a flat Arrow record.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.50}, {"B", 2.75}},
		nil, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	for i := 0; i < int(rec.NumRows()); i++ {
		name := rec.Column(0).(*array.String).Value(i)
		id := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %s | %.2f\n", name, id, price)
	}
}
Output:
rows: 2, cols: 3
  order-1 | A | 1.50
  order-1 | B | 2.75
Example (Coalesce)

ExampleTranscoder_AppendDenorm_coalesce demonstrates FuncCoalesce in a denormalization plan. Coalesce returns the first non-zero value from multiple paths — useful when a field can come from different sources.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Coalesce: use customer name, fall back to email if customer is empty.
			pbpath.PlanPath("buyer",
				pbpath.WithExpr(pbpath.FuncCoalesce(
					pbpath.PathRef("customer"),
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("buyer"),
			),
			pbpath.PlanPath("region", pbpath.Alias("region")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Message 1: customer is set → "Alice" is used.
	msg1 := newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "US", nil, 0)
	tc.AppendDenorm(msg1)

	// Message 2: customer is empty → email is used as fallback.
	msg2 := newSale(saleMD, lineItemMD, "", "bob@example.com", "EU", nil, 0)
	tc.AppendDenorm(msg2)

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		buyer := rec.Column(0).(*array.String).Value(i)
		region := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  buyer=%-18s region=%s\n", buyer, region)
	}
}
Output:
  buyer=Alice              region=US
  buyer=bob@example.com    region=EU
Example (ComposedExpr)

ExampleTranscoder_AppendDenorm_composedExpr demonstrates composing multiple Expr functions together. This example creates a "display label" column that combines customer name and uppercase region, with a fallback for missing customer names.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Composed expression:
			//   Concat(": ", Coalesce(customer, email), Upper(region))
			// This reads as: join the first non-empty identifier with the
			// uppercased region, separated by ": ".
			pbpath.PlanPath("display",
				pbpath.WithExpr(pbpath.FuncConcat(": ",
					pbpath.FuncCoalesce(
						pbpath.PathRef("customer"),
						pbpath.PathRef("email"),
					),
					pbpath.FuncUpper(
						pbpath.PathRef("region"),
					),
				)),
				pbpath.Alias("display"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@ex.com", "us", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "", "bob@ex.com", "eu", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		fmt.Println(rec.Column(0).(*array.String).Value(i))
	}
}
Output:
Alice: US
bob@ex.com: EU
Example (Concat)

ExampleTranscoder_AppendDenorm_concat demonstrates FuncConcat, which joins the string representations of multiple path values with a separator.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Concat: join customer and region with " / ".
			pbpath.PlanPath("label",
				pbpath.WithExpr(pbpath.FuncConcat(" / ",
					pbpath.PathRef("customer"),
					pbpath.PathRef("region"),
				)),
				pbpath.Alias("label"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "EU", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		fmt.Println(rec.Column(0).(*array.String).Value(i))
	}
}
Output:
Alice / US
Bob / EU
Example (CrossJoin)

ExampleTranscoder_AppendDenorm_crossJoin demonstrates cross-join behaviour when two independent repeated fields are denormalized together.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.0}, {"B", 2.0}},
		[]string{"x", "y", "z"}, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d (2 items × 3 tags)\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		id := rec.Column(0).(*array.String).Value(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  %s | %s\n", id, tag)
	}
}
Output:
rows: 6 (2 items × 3 tags)
  A | x
  A | y
  A | z
  B | x
  B | y
  B | z
Example (Default)

ExampleTranscoder_AppendDenorm_default demonstrates FuncDefault, which provides a literal fallback value when a field is zero/empty.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("customer", pbpath.Alias("customer")),
			// Default: if region is empty, use "UNKNOWN".
			pbpath.PlanPath("region",
				pbpath.WithExpr(pbpath.FuncDefault(
					pbpath.PathRef("region"),
					protoreflect.ValueOfString("UNKNOWN"),
				)),
				pbpath.Alias("region"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no region

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		customer := rec.Column(0).(*array.String).Value(i)
		region := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  %s: %s\n", customer, region)
	}
}
Output:
  Alice: US
  Bob: UNKNOWN
Example (Has)

ExampleTranscoder_AppendDenorm_has demonstrates FuncHas, which returns a boolean indicating whether a field has a non-zero value. This is useful for creating presence flag columns.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("customer", pbpath.Alias("customer")),
			// Has: does the email field have a non-empty value?
			pbpath.PlanPath("has_email",
				pbpath.WithExpr(pbpath.FuncHas(
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("has_email"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "", nil, 0))
	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no email

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	for i := 0; i < int(rec.NumRows()); i++ {
		customer := rec.Column(0).(*array.String).Value(i)
		hasEmail := rec.Column(1).(*array.Boolean).Value(i)
		fmt.Printf("  %s: has_email=%v\n", customer, hasEmail)
	}
}
Output:
  Alice: has_email=true
  Bob: has_email=false
Example (LeftJoin)

ExampleTranscoder_AppendDenorm_leftJoin demonstrates left-join semantics when a repeated field is empty: a single null row is produced for that group while the other group fans out normally.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Zero items, 2 tags → items group produces 1 null row → 1 × 2 = 2 rows
	msg := newOrder(orderMD, itemMD, "order-1", nil, []string{"x", "y"}, 1)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		idNull := rec.Column(0).IsNull(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  item_id=null:%v | tag=%s\n", idNull, tag)
	}
}
Output:
rows: 2
  item_id=null:true | tag=x
  item_id=null:true | tag=y
Example (Upper)

ExampleTranscoder_AppendDenorm_upper demonstrates FuncUpper, which converts a string field to upper case — useful for normalizing data during ingestion.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("region_upper",
				pbpath.WithExpr(pbpath.FuncUpper(
					pbpath.PathRef("region"),
				)),
				pbpath.Alias("region_upper"),
			),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "us-east", nil, 0))

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Println(rec.Column(0).(*array.String).Value(0))
}
Output:
US-EAST
Example (WithExprAndFanout)

ExampleTranscoder_AppendDenorm_withExprAndFanout demonstrates combining Expr-based computed columns with fan-out (wildcard) paths. The expression columns broadcast as scalars while the repeated field fans out.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
//	message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
//	message Sale {
//	  string            customer  = 1;
//	  string            email     = 2;
//	  string            region    = 3;
//	  repeated LineItem items     = 4;
//	  int64             timestamp = 5;
//	}
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("expr_example.proto"),
		Package: proto.String("exprexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("LineItem"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Sale"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
					{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}

func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
	sku   string
	price float64
	qty   int64
}, ts int64) proto.Message {
	msg := dynamicpb.NewMessage(saleMD)
	if customer != "" {
		msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
	}
	if email != "" {
		msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
	}
	if region != "" {
		msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
	}
	msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
	list := msg.Mutable(saleMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(lineItemMD)
		item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
		item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
		item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	saleMD, lineItemMD := buildExprExampleDescriptors()

	tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			// Computed column: coalesce customer/email (scalar, broadcasts)
			pbpath.PlanPath("buyer",
				pbpath.WithExpr(pbpath.FuncCoalesce(
					pbpath.PathRef("customer"),
					pbpath.PathRef("email"),
				)),
				pbpath.Alias("buyer"),
			),
			// Fan-out: one row per line item
			pbpath.PlanPath("items[*].sku", pbpath.Alias("sku")),
			pbpath.PlanPath("items[*].unit_price", pbpath.Alias("price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newSale(saleMD, lineItemMD, "Alice", "", "US", []struct {
		sku   string
		price float64
		qty   int64
	}{
		{"WIDGET-001", 9.99, 2},
		{"GADGET-002", 24.50, 1},
	}, 0)
	tc.AppendDenorm(msg)

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		buyer := rec.Column(0).(*array.String).Value(i)
		sku := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %-12s | %.2f\n", buyer, sku, price)
	}
}
Output:
rows: 2
  Alice | WIDGET-001   | 9.99
  Alice | GADGET-002   | 24.50

func (*Transcoder) AppendDenormRaw added in v0.2.0

func (s *Transcoder) AppendDenormRaw(data []byte) error

AppendDenormRaw unmarshals raw protobuf bytes using the HyperType's compiled parser and appends the denormalized result to the transcoder's denormalizer Arrow record builder.

This method requires both a HyperType (via WithHyperType) and a denormalizer plan (via WithDenormalizerPlan). It uses hyperpb.Shared for memory reuse and optionally records profiling data for online PGO.

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendDenormRaw demonstrates high-performance raw-bytes denormalization. This combines hyperpb decoding with the Plan-based denormalizer to go directly from raw protobuf bytes to a flat Arrow record.

This is the fastest path for streaming protobuf data into analytics-ready flat tables.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// 1. Create a shared HyperType.
	ht := bufarrowlib.NewHyperType(outerMD)

	// 2. Create a Transcoder with HyperType + denormalization plan.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// 3. Marshal messages to raw bytes (simulating Kafka/gRPC input).
	msg := newOuterMessage(outerMD, innerMD, "Gadgets", "", []struct {
		id    string
		price float64
	}{{"X", 1.50}, {"Y", 2.75}}, 10)
	raw, err := proto.Marshal(msg)
	if err != nil {
		log.Fatal(err)
	}

	// 4. Feed raw bytes directly into the denormalizer.
	if err := tc.AppendDenormRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		name := rec.Column(0).(*array.String).Value(i)
		id := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %s | %.2f\n", name, id, price)
	}
}
Output:
rows: 2
  Gadgets | X | 1.50
  Gadgets | Y | 2.75
Example (Batch)

ExampleTranscoder_AppendDenormRaw_batch demonstrates a typical batch processing pattern: feed many raw messages, then flush to a single Arrow record batch.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()
	ht := bufarrowlib.NewHyperType(outerMD)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Simulate a batch of raw messages from a Kafka consumer.
	messages := []struct {
		name  string
		items []struct {
			id    string
			price float64
		}
	}{
		{"Order-1", []struct {
			id    string
			price float64
		}{{"A", 1.0}, {"B", 2.0}}},
		{"Order-2", []struct {
			id    string
			price float64
		}{{"C", 3.0}}},
		{"Order-3", []struct {
			id    string
			price float64
		}{{"D", 4.0}, {"E", 5.0}, {"F", 6.0}}},
	}

	for _, m := range messages {
		msg := newOuterMessage(outerMD, innerMD, m.name, "", m.items, 0)
		raw, _ := proto.Marshal(msg)
		if err := tc.AppendDenormRaw(raw); err != nil {
			log.Fatal(err)
		}
	}

	// Flush all accumulated rows into one Arrow record batch.
	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("messages: %d, denorm rows: %d\n", len(messages), rec.NumRows())
}
Output:
messages: 3, denorm rows: 6

func (*Transcoder) AppendRaw added in v0.2.0

func (s *Transcoder) AppendRaw(data []byte) error

AppendRaw unmarshals raw protobuf bytes using the HyperType's compiled parser and appends the result to the transcoder's Arrow record builder.

This method requires a HyperType configured via WithHyperType. It uses hyperpb.Shared for memory reuse and optionally records profiling data for online PGO. When the auto-recompile threshold is reached, the parser is recompiled inline.

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendRaw demonstrates high-performance raw-bytes ingestion using AppendRaw. This accepts raw protobuf wire bytes (e.g. from Kafka, gRPC, or a file) and decodes them using hyperpb's compiled parser — 2–3× faster than proto.Unmarshal with generated code.

AppendRaw populates the full Arrow record (like Append), while AppendDenormRaw populates the denormalized flat record (like AppendDenorm).

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// 1. Create a shared HyperType (compile the parser once).
	ht := bufarrowlib.NewHyperType(outerMD)

	// 2. Create a Transcoder with HyperType.
	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// 3. Marshal a message to raw bytes (simulating receiving from Kafka).
	msg := newOuterMessage(outerMD, innerMD, "Widget", "", []struct {
		id    string
		price float64
	}{{"A", 9.99}}, 5)
	raw, err := proto.Marshal(msg)
	if err != nil {
		log.Fatal(err)
	}

	// 4. Feed raw bytes — no proto.Unmarshal needed.
	if err := tc.AppendRaw(raw); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 4

func (*Transcoder) AppendWithCustom

func (s *Transcoder) AppendWithCustom(value proto.Message, custom proto.Message) error

AppendWithCustom appends a protobuf value merged with custom field values to the transcoder's builder. The custom proto.Message must conform to the message descriptor provided via WithCustomMessage or WithCustomMessageFile. Both messages are marshalled to bytes and unmarshalled into the merged stencil for appending. This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendWithCustom shows appending a message that includes both base and custom fields.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("EU"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(7))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 5

func (*Transcoder) Clone

func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)

Clone returns an identical Transcoder. Use in concurrency scenarios as Transcoder methods are not concurrency safe.

The compiled denormalizer [Plan] is shared (it is immutable), but the Arrow builders, scratch buffers, and leaf scratch are freshly allocated so each clone can operate independently.

Example

ExampleTranscoder_Clone demonstrates cloning a Transcoder for use in a separate goroutine. The clone has independent builders but shares the same schema configuration.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	clone, err := tc.Clone(memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer clone.Release()

	clone.Append(newProduct(md, "Gizmo", 5.00, 10))

	rec := clone.NewRecordBatch()
	defer rec.Release()
	fmt.Printf("clone rows: %d\n", rec.NumRows())

	origRec := tc.NewRecordBatch()
	defer origRec.Release()
	fmt.Printf("original rows: %d\n", origRec.NumRows())
}
Output:
clone rows: 1
original rows: 0
Example (WithHyperType)

ExampleTranscoder_Clone_withHyperType demonstrates cloning a Transcoder that uses HyperType. The clone shares the same HyperType (so profiling data is aggregated) but has independent Arrow builders. This is the recommended pattern for multi-goroutine pipelines.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
//	message Inner { string id = 1; double price = 2; }
//	message Outer {
//	  string         name     = 1;
//	  string         alt_name = 2;
//	  repeated Inner items    = 3;
//	  int64          qty      = 4;
//	}
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("hyper_example.proto"),
		Package: proto.String("hyperexample"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Inner"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Outer"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
					{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}

func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
	id    string
	price float64
}, qty int64) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(outerMD)
	msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	if altName != "" {
		msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
	}
	msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
	list := msg.Mutable(outerMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(innerMD)
		item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	return msg
}

func main() {
	outerMD, innerMD := buildHyperExampleDescriptors()

	// Shared HyperType — both transcoders contribute profiling data.
	ht := bufarrowlib.NewHyperType(outerMD)

	tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
		bufarrowlib.WithHyperType(ht),
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("product")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Clone for a second goroutine — shares HyperType + immutable Plan,
	// fresh Arrow builders and scratch buffers.
	clone, err := tc.Clone(memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer clone.Release()

	// Feed different data to each transcoder.
	msg1 := newOuterMessage(outerMD, innerMD, "Alpha", "", []struct {
		id    string
		price float64
	}{{"A1", 1.0}}, 0)
	raw1, _ := proto.Marshal(msg1)
	tc.AppendDenormRaw(raw1)

	msg2 := newOuterMessage(outerMD, innerMD, "Bravo", "", []struct {
		id    string
		price float64
	}{{"B1", 2.0}, {"B2", 3.0}}, 0)
	raw2, _ := proto.Marshal(msg2)
	clone.AppendDenormRaw(raw2)

	// Each transcoder flushes independently.
	rec1 := tc.NewDenormalizerRecordBatch()
	defer rec1.Release()
	rec2 := clone.NewDenormalizerRecordBatch()
	defer rec2.Release()

	fmt.Printf("original: %d rows\n", rec1.NumRows())
	fmt.Printf("clone:    %d rows\n", rec2.NumRows())
}
Output:
original: 1 rows
clone:    2 rows

func (*Transcoder) DenormalizerBuilder

func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder

DenormalizerBuilder returns the denormalizer's Arrow array.RecordBuilder. This is exposed for callers who need to implement custom denormalization logic beyond what Transcoder.AppendDenorm provides. In most cases prefer AppendDenorm for automatic fan-out and cross-join handling. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerBuilder shows accessing the underlying RecordBuilder for custom denormalization logic.

orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

builder := tc.DenormalizerBuilder()
fmt.Printf("builder fields: %d\n", builder.Schema().NumFields())
fmt.Printf("schema: %s\n", builder.Schema().Field(0).Type)
Output:
builder fields: 1
schema: utf8

func (*Transcoder) DenormalizerSchema

func (s *Transcoder) DenormalizerSchema() *arrow.Schema

DenormalizerSchema returns the Arrow schema of the denormalized record. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerSchema demonstrates inspecting the Arrow schema of the denormalized record, showing column names, types, and nullability.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func main() {
	orderMD, _ := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("seq", pbpath.Alias("order_seq")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	schema := tc.DenormalizerSchema()
	for i, f := range schema.Fields() {
		fmt.Printf("  %d: %-12s %-10s nullable=%v\n", i, f.Name, f.Type, f.Nullable)
	}
}
Output:
  0: order_name   utf8       nullable=true
  1: order_seq    int64      nullable=true
  2: item_price   float64    nullable=true

func (*Transcoder) FieldNames

func (s *Transcoder) FieldNames() []string

FieldNames returns the top-level Arrow field names of the message schema.

Example

ExampleTranscoder_FieldNames shows retrieving top-level field names.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty]

func (*Transcoder) NewDenormalizerRecordBatch

func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch

NewDenormalizerRecordBatch returns the buffered denormalizer builder contents as an arrow.RecordBatch. The builder is reset and can be reused. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_NewDenormalizerRecordBatch shows flushing the denormalizer's builder into a record batch.

orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.0}, {"B", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Println(rec.Column(0).(*array.String).Value(i))
}
Output:
rows: 2
A
B

func (*Transcoder) NewRecordBatch

func (s *Transcoder) NewRecordBatch() arrow.RecordBatch

NewRecordBatch returns the buffered builder contents as an arrow.RecordBatch. The builder is reset and can be reused.

Example

ExampleTranscoder_NewRecordBatch shows building an Arrow record batch from appended messages.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 3

func (*Transcoder) Parquet

func (s *Transcoder) Parquet() *schema.Schema

Parquet returns the Parquet schema.Schema for the message.

Example

ExampleTranscoder_Parquet demonstrates inspecting the Parquet schema.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	pqSchema := tc.Parquet()
	fmt.Printf("parquet columns: %d\n", pqSchema.NumColumns())
	for i := 0; i < pqSchema.NumColumns(); i++ {
		fmt.Printf("  %s\n", pqSchema.Column(i).Name())
	}
}
Output:
parquet columns: 3
  name
  price
  qty

func (*Transcoder) Proto

func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message

Proto decodes selected rows from an Arrow RecordBatch back into protobuf messages. Pass nil for rows to decode all rows.

Example

ExampleTranscoder_Proto demonstrates round-tripping: appending protobuf messages, building an Arrow record, and reconstructing them back as protobuf messages.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	msgs := tc.Proto(rec, nil) // nil = all rows
	fmt.Printf("recovered %d messages\n", len(msgs))
	for _, m := range msgs {
		js, _ := protojson.Marshal(m)
		var c bytes.Buffer
		json.Compact(&c, js)
		fmt.Println(c.String())
	}
}
Output:
recovered 2 messages
{"name":"Widget","price":9.99,"qty":5}
{"name":"Gadget","price":24.5,"qty":2}

func (*Transcoder) ReadParquet

func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)

ReadParquet reads the specified columns from Parquet source r and returns an Arrow RecordBatch. The returned RecordBatch must be released by the caller.

Example

ExampleTranscoder_ReadParquet demonstrates a full Parquet round-trip: write messages to Parquet, then read them back into an Arrow record.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	reader := bytes.NewReader(buf.Bytes())
	rec, err := tc.ReadParquet(context.Background(), reader, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer rec.Release()

	fmt.Printf("read back: %d rows, %d cols\n", rec.NumRows(), rec.NumCols())
}
Output:
read back: 2 rows, 3 cols

func (*Transcoder) Release

func (s *Transcoder) Release()

Release releases the reference on the underlying Arrow record builder.

Example

ExampleTranscoder_Release shows the standard cleanup pattern.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	// Release should always be deferred after construction.
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 1

func (*Transcoder) Schema

func (s *Transcoder) Schema() *arrow.Schema

Schema returns the Arrow arrow.Schema for the message.

Example

ExampleTranscoder_Schema demonstrates inspecting the Arrow schema derived from a protobuf message descriptor.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	for i, f := range tc.Schema().Fields() {
		fmt.Printf("  %d: %-8s %s\n", i, f.Name, f.Type)
	}
}
Output:
  0: name     utf8
  1: price    float64
  2: qty      int32

func (*Transcoder) WriteParquet

func (s *Transcoder) WriteParquet(w io.Writer) error

WriteParquet writes the current record to Parquet format on w.

Example

ExampleTranscoder_WriteParquet demonstrates writing appended messages to Parquet and reading them back.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	fmt.Printf("parquet bytes: %d\n", buf.Len())
	fmt.Println("wrote parquet successfully")
}
Output:
parquet bytes: 714
wrote parquet successfully

func (*Transcoder) WriteParquetRecords

func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error

WriteParquetRecords writes one or many Arrow records to Parquet on w.

Example

ExampleTranscoder_WriteParquetRecords demonstrates writing multiple Arrow record batches to a single Parquet file.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec1 := tc.NewRecordBatch()
	defer rec1.Release()

	tc.Append(newProduct(md, "Gadget", 24.50, 2))
	rec2 := tc.NewRecordBatch()
	defer rec2.Release()

	var buf bytes.Buffer
	if err := tc.WriteParquetRecords(&buf, rec1, rec2); err != nil {
		log.Fatal(err)
	}

	fmt.Println("wrote 2 record batches to parquet")
}
Output:
wrote 2 record batches to parquet

Directories

Path Synopsis
gen
proto
pbpath
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL