bufarrowlib

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

README

bufarrowLib 🦬

Go Reference

Go library to build Apache Arrow records from Protocol Buffers.

Features

Schema generation
  • Derive Arrow and Parquet schemas automatically from any protobuf message descriptor, including deeply-nested and recursive messages.
  • Construct a Transcoder from compiled Go types (protoreflect.MessageDescriptor) or from .proto files at runtime via NewFromFile.
  • Merge extra "custom" fields into the base schema with WithCustomMessage / WithCustomMessageFile — useful for enriching data with sidecar metadata.
Full-fidelity transcoding
  • Proto → ArrowAppend / AppendWithCustom populate an Arrow RecordBuilder that mirrors the full protobuf structure (structs, lists, maps, oneofs).
  • Arrow → ProtoProto reconstructs typed protobuf messages from Arrow record batches.
  • Proto → Parquet → ArrowWriteParquet / WriteParquetRecords write Arrow records to Parquet; ReadParquet reads them back.
Well-known & common type support

The denormalizer automatically maps protobuf well-known types and common types to flat Arrow scalars — no manual conversion needed.

Protobuf type Arrow type
google.protobuf.Timestamp Timestamp(ms, UTC)
google.protobuf.Duration Duration(ms)
google.protobuf.FieldMask String (comma-joined paths)
google.protobuf.*Value wrappers unwrapped scalar (BoolValueBoolean, Int64ValueInt64, etc.)
google.type.Date Date32
google.type.TimeOfDay Time64(µs)
google.type.Money String (protojson)
google.type.LatLng String (protojson)
google.type.Color String (protojson)
google.type.PostalAddress String (protojson)
google.type.Interval String (protojson)
OpenTelemetry AnyValue Binary (proto-marshalled)
Denormalization

Use WithDenormalizerPlan to project selected protobuf field paths into a flat Arrow record — like SQL SELECT … FROM msg CROSS JOIN UNNEST(msg.items).

  • Paths are specified with the pbpath path language: "items[*].id", "tags[*]", "imp[0:3].banner.w", "name", etc.
  • Column aliases via pbpath.Alias("col_name").
  • Independent repeated-field fan-outs are cross-joined; empty groups emit a single null row (left-join semantics).
  • Fixed-index paths (items[0].id) broadcast as scalars; ranges and wildcards fan out.
  • All type mapping and append closures are compiled once at construction time for minimal per-message overhead.
tc, _ := bufarrowlib.New(md, mem,
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("name",            pbpath.Alias("order_name")),
        pbpath.PlanPath("items[*].id",     pbpath.Alias("item_id")),
        pbpath.PlanPath("items[*].price",  pbpath.Alias("item_price")),
        pbpath.PlanPath("tags[*]",         pbpath.Alias("tag")),
    ),
)
tc.AppendDenorm(msg) // 2 items × 3 tags → 6 rows
rec := tc.NewDenormalizerRecordBatch()
Cloning

Transcoder.Clone creates an independent copy (separate builders and stencils) that can be used in another goroutine. All options — including denormalization plans — carry over.

pbpath

The proto/pbpath subpackage is a standalone protobuf field-path engine that can be used independently of the rest of bufarrowlib.

  • Parse a dot-delimited path string against a message descriptor, with support for list wildcards ([*]), Python-style slices ([1:3], [-2:], [::2]), ranges, and negative indices.
  • Evaluate a compiled path against a live proto.Message to extract values, including fan-out across repeated fields.
  • Plan API — compile multiple paths into a trie-based execution plan that traverses shared prefixes only once, ideal for hot-path extraction of many fields from the same message type.

See the full pbpath README and pkg.go.dev reference for details.

🚀 Install

go get -u github.com/loicalleyne/bufarrowlib@latest

💡 Usage

import "github.com/loicalleyne/bufarrowlib"
Quick start — full record
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
tc.Append(msg)
rec := tc.NewRecordBatch()
defer rec.Release()

// Arrow schema & Parquet schema are available:
_ = tc.Schema()   // *arrow.Schema
_ = tc.Parquet()  // *schema.Schema
Quick start — denormalized record
tc, err := bufarrowlib.New(md, memory.DefaultAllocator,
    bufarrowlib.WithDenormalizerPlan(
        pbpath.PlanPath("items[*].id",    pbpath.Alias("item_id")),
        pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
    ),
)
tc.AppendDenorm(msg)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
Quick start — Parquet round-trip
// Write
var buf bytes.Buffer
tc.Append(msg)
tc.WriteParquet(&buf)

// Read
rec, err := tc.ReadParquet(ctx, bytes.NewReader(buf.Bytes()), nil)
Quick start — proto .proto file at runtime
tc, err := bufarrowlib.NewFromFile(
    "path/to/schema.proto", "MyMessage",
    []string{"path/to/imports"},
    memory.DefaultAllocator,
)

💫 Show your support

Give a ⭐️ if this project helped you! Feedback and PRs welcome.

Licence

bufarrowLib is released under the Apache 2.0 license. See LICENCE.txt

Documentation

Overview

Example (DenormToParquet)

Example_denormToParquet shows the complete workflow of denormalizing nested protobuf messages into a flat Arrow record and writing to Parquet.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
		pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Append an order with 2 items × 2 tags → 4 denormalized rows.
msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.50}, {"B", 2.75}},
	[]string{"rush", "fragile"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

// The denormalized record builder produces a flat schema.
fmt.Printf("schema: %v\n", tc.DenormalizerSchema().Field(0).Name)

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("denorm rows: %d (2 items × 2 tags)\n", rec.NumRows())

// Write the flat denormalized record to Parquet via a new Transcoder
// built from the denorm schema, or write directly using pqarrow.
// Here we show the denorm data for inspection:
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	price := rec.Column(2).(*array.Float64).Value(i)
	tag := rec.Column(3).(*array.String).Value(i)
	fmt.Printf("  %s | %s | %.2f | %s\n", name, id, price, tag)
}
Output:
schema: order_name
denorm rows: 4 (2 items × 2 tags)
  order-1 | A | 1.50 | rush
  order-1 | A | 1.50 | fragile
  order-1 | B | 2.75 | rush
  order-1 | B | 2.75 | fragile
Example (ParquetRoundTrip)

Example_parquetRoundTrip shows a full round-trip: proto → Arrow → Parquet → Arrow → proto. This demonstrates that data survives serialisation intact.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Encode two products.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

// Proto → Arrow → Parquet (in-memory buffer).
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Parquet → Arrow.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

// Arrow → Proto.
msgs := tc.Proto(rec, nil)

fmt.Printf("round-tripped %d messages:\n", len(msgs))
for _, m := range msgs {
	js, _ := protojson.Marshal(m)
	var c bytes.Buffer
	json.Compact(&c, js)
	fmt.Printf("  %s\n", c.String())
}
Output:
round-tripped 2 messages:
  {"name":"Widget","price":9.99,"qty":5}
  {"name":"Gadget","price":24.5,"qty":2}
Example (ParquetSelectColumns)

Example_parquetSelectColumns shows reading only specific columns from a Parquet file — useful for analytics queries that touch a subset of fields.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

// Read only column 0 (name) and column 2 (qty).
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, []int{0, 2})
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("cols: %d, rows: %d\n", rec.NumCols(), rec.NumRows())
names := rec.Column(0).(*array.String)
qtys := rec.Column(1).(*array.Int32)
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Printf("  %s: %d\n", names.Value(i), qtys.Value(i))
}
Output:
cols: 2, rows: 2
  Widget: 5
  Gadget: 2
Example (ProtoFileToParquet)

Example_protoFileToParquet shows the end-to-end workflow when working with .proto files on disk: compile the schema, create a transcoder, populate messages, and write Parquet.

// Write an inline .proto to a temp file.
_, dir := writeProtoFile("event.proto", `syntax = "proto3";
message Event {
  string id      = 1;
  string action  = 2;
  int64  user_id = 3;
}
`)
defer os.RemoveAll(dir)

// Compile the .proto and look up the message descriptor.
fd, err := bufarrowlib.CompileProtoToFileDescriptor("event.proto", []string{dir})
if err != nil {
	log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Event")
if err != nil {
	log.Fatal(err)
}

// Create a Transcoder from the compiled descriptor.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Build and append messages using dynamicpb.
for _, e := range []struct {
	id, action string
	uid        int64
}{
	{"e1", "click", 100},
	{"e2", "view", 200},
} {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("id"), protoreflect.ValueOfString(e.id))
	msg.Set(md.Fields().ByName("action"), protoreflect.ValueOfString(e.action))
	msg.Set(md.Fields().ByName("user_id"), protoreflect.ValueOfInt64(e.uid))
	tc.Append(msg)
}

var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
	log.Fatal(err)
}

fmt.Printf("fields: %v\n", tc.FieldNames())
fmt.Printf("wrote parquet: %d bytes\n", buf.Len())
Output:
fields: [id action user_id]
wrote parquet: 688 bytes
Example (ProtoToParquetBatched)

Example_protoToParquetBatched shows writing protobuf messages to Parquet in batches — useful when processing a large stream and flushing periodically.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Batch 1: first two messages.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
batch1 := tc.NewRecordBatch()
defer batch1.Release()

// Batch 2: one more message.
tc.Append(newProduct(md, "Gizmo", 7.25, 12))
batch2 := tc.NewRecordBatch()
defer batch2.Release()

// Write both batches to a single Parquet file.
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, batch1, batch2); err != nil {
	log.Fatal(err)
}

// Read back and verify total row count.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
	log.Fatal(err)
}
defer rec.Release()

fmt.Printf("batches: 2, total rows read back: %d\n", rec.NumRows())
Output:
batches: 2, total rows read back: 3
Example (ProtoToParquetFile)

Example_protoToParquetFile shows the complete workflow of consuming protobuf messages and writing them to a Parquet file on disk.

md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

// Simulate consuming a stream of protobuf messages.
products := []struct {
	name  string
	price float64
	qty   int32
}{
	{"Widget", 9.99, 5},
	{"Gadget", 24.50, 2},
	{"Gizmo", 7.25, 12},
}
for _, p := range products {
	tc.Append(newProduct(md, p.name, p.price, p.qty))
}

// Write to a Parquet file.
f, err := os.CreateTemp("", "example-*.parquet")
if err != nil {
	log.Fatal(err)
}
name := f.Name()
defer os.Remove(name)

if err := tc.WriteParquet(f); err != nil {
	log.Fatal(err)
}
f.Close()

info, _ := os.Stat(name)
fmt.Printf("wrote %d messages to parquet (%d bytes)\n", len(products), info.Size())
Output:
wrote 3 messages to parquet (738 bytes)

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrMxDepth is returned when the protobuf message nesting exceeds maxDepth.
	ErrMxDepth = errors.New("max depth reached, either the message is deeply nested or a circular dependency was introduced")
	// ErrPathNotFound is returned by node.getPath when a field name is not
	// found in the node's hash map.
	ErrPathNotFound = errors.New("path not found")
)

Sentinel errors returned during schema construction and path lookup.

Functions

func CompileProtoToFileDescriptor

func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)

CompileProtoToFileDescriptor compiles a .proto file at runtime using protocompile and returns the resulting FileDescriptor. importPaths are the directories searched for transitive imports.

Example

ExampleCompileProtoToFileDescriptor demonstrates compiling a .proto file from disk at runtime and inspecting its messages.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < fd.Messages().Len(); i++ {
		m := fd.Messages().Get(i)
		fmt.Printf("%s (%d fields)\n", m.Name(), m.Fields().Len())
	}
}
Output:
Item (2 fields)

func GetMessageDescriptorByName

func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)

GetMessageDescriptorByName looks up a top-level message by name in the given FileDescriptor. Returns an error if the message is not found.

Example

ExampleGetMessageDescriptorByName demonstrates looking up a specific message by name from a compiled FileDescriptor.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	_, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
	if err != nil {
		log.Fatal(err)
	}

	md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Item")
	if err != nil {
		log.Fatal(err)
	}

	for i := 0; i < md.Fields().Len(); i++ {
		f := md.Fields().Get(i)
		fmt.Printf("%s %s\n", f.Name(), f.Kind())
	}
}
Output:
name string
price double

func MergeMessageDescriptors

func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)

MergeMessageDescriptors merges two message descriptors into a new one with the specified name. It appends the fields from the second descriptor to the first, avoiding name conflicts. Field numbers from b are auto-renumbered starting after a's max field number to prevent wire-format collisions. Nested message types and enum types from b are also carried over. The resulting message descriptor is wrapped in a synthetic file descriptor to ensure it can be used independently.

Example

ExampleMergeMessageDescriptors demonstrates merging two message descriptors into one with combined fields.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	merged, err := bufarrowlib.MergeMessageDescriptors(baseMD, customMD, "Merged")
	if err != nil {
		log.Fatal(err)
	}

	fields := merged.Fields()
	for i := 0; i < fields.Len(); i++ {
		fmt.Println(fields.Get(i).Name())
	}
}
Output:
name
price
qty
region
batch_id

func ProtoKindToAppendFunc

func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc

ProtoKindToAppendFunc returns a closure that appends a protoreflect.Value of the appropriate kind to the given Arrow array builder. The builder must match the Arrow data type returned by ProtoKindToArrowType for the same field descriptor.

Returns nil if the field's kind is not a recognized scalar mapping.

Example

ExampleProtoKindToAppendFunc demonstrates obtaining a typed append closure for a protobuf field and using it to populate an Arrow builder.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	nameFD := md.Fields().ByName("name")

	dt := bufarrowlib.ProtoKindToArrowType(nameFD)
	builder := array.NewBuilder(memory.DefaultAllocator, dt)
	defer builder.Release()

	appendFn := bufarrowlib.ProtoKindToAppendFunc(nameFD, builder)
	appendFn(protoreflect.ValueOfString("hello"))
	appendFn(protoreflect.ValueOfString("world"))

	arr := builder.NewArray()
	defer arr.Release()

	fmt.Printf("len: %d\n", arr.Len())
	fmt.Println(arr.(*array.String).Value(0))
	fmt.Println(arr.(*array.String).Value(1))
}
Output:
len: 2
hello
world

func ProtoKindToArrowType

func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType

ProtoKindToArrowType returns the Arrow data type corresponding to a protobuf field descriptor's scalar kind. In addition to primitive kinds (bool, int32, string, etc.), the following well-known and common message types are recognised and mapped to flat Arrow scalars:

  • google.protobuf.Timestamp → Timestamp(ms, UTC)
  • google.protobuf.Duration → Duration(ms)
  • google.protobuf.FieldMask → String (comma-joined paths)
  • google.protobuf.*Value → unwrapped scalar (BoolValue→Boolean, etc.)
  • google.type.Date → Date32
  • google.type.TimeOfDay → Time64(µs)
  • google.type.Money → String (protojson)
  • google.type.LatLng → String (protojson)
  • google.type.Color → String (protojson)
  • google.type.PostalAddress → String (protojson)
  • google.type.Interval → String (protojson)
  • opentelemetry AnyValue → Binary (proto-marshalled)

Returns nil if the field's kind or message type is not a recognized scalar mapping (e.g. a generic message, map, or group).

TODO: add a WithTimestampUnit(arrow.TimeUnit) option to allow callers to override the default Millisecond precision.

Example

ExampleProtoKindToArrowType demonstrates mapping protobuf field kinds to Arrow data types.

package main

import (
	"fmt"
	"log"

	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()

	for i := 0; i < md.Fields().Len(); i++ {
		fd := md.Fields().Get(i)
		dt := bufarrowlib.ProtoKindToArrowType(fd)
		fmt.Printf("%-8s → %s\n", fd.Name(), dt)
	}
}
Output:
name     → utf8
price    → float64
qty      → int32

Types

type Opt

type Opt struct {
	// contains filtered or unexported fields
}

Opt holds the option values collected from Option functions and passed to New or NewFromFile. Fields are unexported; use the With* helpers.

type Option

type Option func(config)

Option is a functional option applied to New and NewFromFile to configure schema merging, denormalization, or other behaviours.

func WithCustomMessage

func WithCustomMessage(msgDesc protoreflect.MessageDescriptor) Option

WithCustomMessage provides a protoreflect.MessageDescriptor whose fields will be merged into the base message schema. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessageFile.

Example

ExampleWithCustomMessage demonstrates the WithCustomMessage option.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty region batch_id]

func WithCustomMessageFile

func WithCustomMessageFile(protoFilePath, messageName string, importPaths []string) Option

WithCustomMessageFile specifies a .proto file and message name whose fields will be merged into the base message schema. The .proto file is compiled at schema creation time using protocompile. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessage.

Example

ExampleWithCustomMessageFile demonstrates augmenting a base schema with custom fields loaded from a .proto file on disk.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

const exampleCustomProto = `syntax = "proto3";
message Extra {
  string region = 1;
}
`

func main() {
	basePath, baseDir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(baseDir)

	customPath, customDir := writeProtoFile("extra.proto", exampleCustomProto)
	defer os.RemoveAll(customDir)

	tc, err := bufarrowlib.NewFromFile(
		filepath.Base(basePath), "Item", []string{baseDir},
		memory.DefaultAllocator,
		bufarrowlib.WithCustomMessageFile(filepath.Base(customPath), "Extra", []string{customDir}),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price region]

func WithDenormalizerPlan

func WithDenormalizerPlan(paths ...pbpath.PlanPathSpec) Option

WithDenormalizerPlan configures one or more protobuf field paths to project into a flat (denormalized) Arrow record. Each path is specified as a pbpath.PlanPathSpec created via pbpath.PlanPath, which supports per-path options such as pbpath.Alias and pbpath.StrictPath.

Paths that traverse repeated fields with a wildcard [*] or range [start:end] produce fan-out rows. Multiple independent fan-out groups are cross-joined; empty fan-out groups produce a single null row (left-join semantics).

Each leaf path must terminate at a scalar protobuf field or a recognized well-known message type (google.protobuf.Timestamp, otel AnyValue). Message-typed terminal nodes are rejected at schema creation time.

Example

ExampleWithDenormalizerPlan demonstrates configuring a denormalization plan.

orderMD, itemMD := buildExampleDescriptors()

tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"X", 1.0}, {"Y", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	name := rec.Column(0).(*array.String).Value(i)
	id := rec.Column(1).(*array.String).Value(i)
	fmt.Printf("  %s | %s\n", name, id)
}
Output:
rows: 2
  order-1 | X
  order-1 | Y

type Transcoder

type Transcoder struct {
	// contains filtered or unexported fields
}

Transcoder converts protobuf messages to Apache Arrow record batches and back. It holds the compiled message schema, an Arrow record builder for the full message ("stencil"), and optionally a separate denormalizer that projects selected paths into a flat Arrow record suitable for analytics.

A Transcoder is not safe for concurrent use; call Transcoder.Clone to create independent copies for parallel goroutines.

func New

func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)

New returns a new Transcoder from a pre-resolved message descriptor. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile. WithDenormalizerPlan creates a separate flat Arrow record for analytics whilst WithCustomMessage/WithCustomMessageFile expand the schema of the proto.MessageDescriptor used as input.

Example

ExampleNew demonstrates creating a Transcoder, appending protobuf messages, and retrieving the result as an Arrow record batch.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
rows: 2, cols: 3
fields: [name price qty]
Example (WithCustomMessage)

ExampleNew_withCustomMessage demonstrates augmenting a protobuf schema with custom fields using WithCustomMessage, and populating both the base and custom fields via AppendWithCustom.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Printf("fields: %v\n", tc.FieldNames())

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("US-EAST"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(42))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
fields: [name price qty region batch_id]
rows: 1, cols: 5

func NewFromFile

func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, opts ...Option) (*Transcoder, error)

NewFromFile returns a new Transcoder by compiling a .proto file at runtime. protoFilePath is the path to the .proto file, messageName is the top-level message to use, and importPaths are the directories to search for imports. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile.

Example

ExampleNewFromFile demonstrates creating a Transcoder directly from a .proto file on disk, without pre-compiling the descriptor yourself.

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
)

// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
	dir, err := os.MkdirTemp("", "bufarrow-example")
	if err != nil {
		log.Fatal(err)
	}
	filePath = filepath.Join(dir, name)
	if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
		log.Fatal(err)
	}
	return filePath, dir
}

const exampleProto = `syntax = "proto3";
message Item {
  string name  = 1;
  double price = 2;
}
`

func main() {
	path, dir := writeProtoFile("item.proto", exampleProto)
	defer os.RemoveAll(dir)

	tc, err := bufarrowlib.NewFromFile(filepath.Base(path), "Item", []string{dir}, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Populate a message using the schema's descriptor.
	md := tc.Schema().Field(0) // just verify schema was built
	fmt.Printf("field 0: %s\n", md.Name)
	fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output:
field 0: name
fields: [name price]

func (*Transcoder) Append

func (s *Transcoder) Append(value proto.Message)

Append appends a protobuf message to the transcoder's Arrow record builder. This method is not safe for concurrent use.

Example

ExampleTranscoder_Append shows the basic append-and-flush cycle.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Alpha", 1.0, 10))
	tc.Append(newProduct(md, "Bravo", 2.0, 20))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 2

func (*Transcoder) AppendDenorm

func (s *Transcoder) AppendDenorm(msg proto.Message) error

AppendDenorm evaluates the denormalizer plan against msg and appends the resulting denormalized rows to the denormalizer's Arrow record builder.

Fan-out groups are cross-joined: each group contributes max(len(branches), 1) rows, and the total row count is the product of all group counts. Empty fan-out groups (no branches) contribute 1 null row (left-join semantics).

This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendDenorm demonstrates basic denormalization: projecting scalar and fan-out fields from a protobuf message into a flat Arrow record.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.50}, {"B", 2.75}},
		nil, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
	for i := 0; i < int(rec.NumRows()); i++ {
		name := rec.Column(0).(*array.String).Value(i)
		id := rec.Column(1).(*array.String).Value(i)
		price := rec.Column(2).(*array.Float64).Value(i)
		fmt.Printf("  %s | %s | %.2f\n", name, id, price)
	}
}
Output:
rows: 2, cols: 3
  order-1 | A | 1.50
  order-1 | B | 2.75
Example (CrossJoin)

ExampleTranscoder_AppendDenorm_crossJoin demonstrates cross-join behaviour when two independent repeated fields are denormalized together.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	msg := newOrder(orderMD, itemMD, "order-1",
		[]struct {
			id    string
			price float64
		}{{"A", 1.0}, {"B", 2.0}},
		[]string{"x", "y", "z"}, 1,
	)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d (2 items × 3 tags)\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		id := rec.Column(0).(*array.String).Value(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  %s | %s\n", id, tag)
	}
}
Output:
rows: 6 (2 items × 3 tags)
  A | x
  A | y
  A | z
  B | x
  B | y
  B | z
Example (LeftJoin)

ExampleTranscoder_AppendDenorm_leftJoin demonstrates left-join semantics when a repeated field is empty: a single null row is produced for that group while the other group fans out normally.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
	id    string
	price float64
}, tags []string, seq int64) proto.Message {
	msg := dynamicpb.NewMessage(orderMD)
	msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
	list := msg.Mutable(orderMD.Fields().ByName("items")).List()
	for _, it := range items {
		item := dynamicpb.NewMessage(itemMD)
		item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
		item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
		list.Append(protoreflect.ValueOfMessage(item))
	}
	tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
	for _, tg := range tags {
		tagList.Append(protoreflect.ValueOfString(tg))
	}
	return msg
}

func main() {
	orderMD, itemMD := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
			pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	// Zero items, 2 tags → items group produces 1 null row → 1 × 2 = 2 rows
	msg := newOrder(orderMD, itemMD, "order-1", nil, []string{"x", "y"}, 1)
	if err := tc.AppendDenorm(msg); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewDenormalizerRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
	for i := 0; i < int(rec.NumRows()); i++ {
		idNull := rec.Column(0).IsNull(i)
		tag := rec.Column(1).(*array.String).Value(i)
		fmt.Printf("  item_id=null:%v | tag=%s\n", idNull, tag)
	}
}
Output:
rows: 2
  item_id=null:true | tag=x
  item_id=null:true | tag=y

func (*Transcoder) AppendWithCustom

func (s *Transcoder) AppendWithCustom(value proto.Message, custom proto.Message) error

AppendWithCustom appends a protobuf value merged with custom field values to the transcoder's builder. The custom proto.Message must conform to the message descriptor provided via WithCustomMessage or WithCustomMessageFile. Both messages are marshalled to bytes and unmarshalled into the merged stencil for appending. This method is not safe for concurrent use.

Example

ExampleTranscoder_AppendWithCustom shows appending a message that includes both base and custom fields.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

// buildCustomDescriptor constructs an inline custom-fields message:
//
//	message CustomFields {
//	  string region   = 1;
//	  int64  batch_id = 2;
//	}
func buildCustomDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_custom.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("CustomFields"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("CustomFields")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	baseMD := buildSimpleDescriptor()
	customMD := buildCustomDescriptor()

	tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
		bufarrowlib.WithCustomMessage(customMD),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	base := newProduct(baseMD, "Widget", 9.99, 5)
	custom := dynamicpb.NewMessage(customMD)
	custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("EU"))
	custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(7))

	if err := tc.AppendWithCustom(base, custom); err != nil {
		log.Fatal(err)
	}

	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 5

func (*Transcoder) Clone

func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)

Clone returns an identical Transcoder. Use in concurrency scenarios as Transcoder methods are not concurrency safe.

Example

ExampleTranscoder_Clone demonstrates cloning a Transcoder for use in a separate goroutine. The clone has independent builders but shares the same schema configuration.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	clone, err := tc.Clone(memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer clone.Release()

	clone.Append(newProduct(md, "Gizmo", 5.00, 10))

	rec := clone.NewRecordBatch()
	defer rec.Release()
	fmt.Printf("clone rows: %d\n", rec.NumRows())

	origRec := tc.NewRecordBatch()
	defer origRec.Release()
	fmt.Printf("original rows: %d\n", origRec.NumRows())
}
Output:
clone rows: 1
original rows: 0

func (*Transcoder) DenormalizerBuilder

func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder

DenormalizerBuilder returns the denormalizer's Arrow array.RecordBuilder. This is exposed for callers who need to implement custom denormalization logic beyond what Transcoder.AppendDenorm provides. In most cases prefer AppendDenorm for automatic fan-out and cross-join handling. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerBuilder shows accessing the underlying RecordBuilder for custom denormalization logic.

orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("name", pbpath.Alias("order_name")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

builder := tc.DenormalizerBuilder()
fmt.Printf("builder fields: %d\n", builder.Schema().NumFields())
fmt.Printf("schema: %s\n", builder.Schema().Field(0).Type)
Output:
builder fields: 1
schema: utf8

func (*Transcoder) DenormalizerSchema

func (s *Transcoder) DenormalizerSchema() *arrow.Schema

DenormalizerSchema returns the Arrow schema of the denormalized record. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_DenormalizerSchema demonstrates inspecting the Arrow schema of the denormalized record, showing column names, types, and nullability.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"github.com/loicalleyne/bufarrowlib/proto/pbpath"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildExampleDescriptors constructs an inline proto schema for examples:
//
//	message Item { string id = 1; double price = 2; }
//	message Order {
//	  string         name  = 1;
//	  repeated Item  items = 2;
//	  repeated string tags = 3;
//	  int64          seq   = 4;
//	}
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
	messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
	labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Item"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
				},
			},
			{
				Name: proto.String("Order"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
					{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
					{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}

func main() {
	orderMD, _ := buildExampleDescriptors()

	tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
		bufarrowlib.WithDenormalizerPlan(
			pbpath.PlanPath("name", pbpath.Alias("order_name")),
			pbpath.PlanPath("seq", pbpath.Alias("order_seq")),
			pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
		),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	schema := tc.DenormalizerSchema()
	for i, f := range schema.Fields() {
		fmt.Printf("  %d: %-12s %-10s nullable=%v\n", i, f.Name, f.Type, f.Nullable)
	}
}
Output:
  0: order_name   utf8       nullable=true
  1: order_seq    int64      nullable=true
  2: item_price   float64    nullable=true

func (*Transcoder) FieldNames

func (s *Transcoder) FieldNames() []string

FieldNames returns the top-level Arrow field names of the message schema.

Example

ExampleTranscoder_FieldNames shows retrieving top-level field names.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	fmt.Println(tc.FieldNames())
}
Output:
[name price qty]

func (*Transcoder) NewDenormalizerRecordBatch

func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch

NewDenormalizerRecordBatch returns the buffered denormalizer builder contents as an arrow.RecordBatch. The builder is reset and can be reused. Returns nil if no denormalizer plan was configured.

Example

ExampleTranscoder_NewDenormalizerRecordBatch shows flushing the denormalizer's builder into a record batch.

orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
	bufarrowlib.WithDenormalizerPlan(
		pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
	),
)
if err != nil {
	log.Fatal(err)
}
defer tc.Release()

msg := newOrder(orderMD, itemMD, "order-1",
	[]struct {
		id    string
		price float64
	}{{"A", 1.0}, {"B", 2.0}},
	nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
	log.Fatal(err)
}

rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()

fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
	fmt.Println(rec.Column(0).(*array.String).Value(i))
}
Output:
rows: 2
A
B

func (*Transcoder) NewRecordBatch

func (s *Transcoder) NewRecordBatch() arrow.RecordBatch

NewRecordBatch returns the buffered builder contents as an arrow.RecordBatch. The builder is reset and can be reused.

Example

ExampleTranscoder_NewRecordBatch shows building an Arrow record batch from appended messages.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output:
rows: 1, cols: 3

func (*Transcoder) Parquet

func (s *Transcoder) Parquet() *schema.Schema

Parquet returns the Parquet schema.Schema for the message.

Example

ExampleTranscoder_Parquet demonstrates inspecting the Parquet schema.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	pqSchema := tc.Parquet()
	fmt.Printf("parquet columns: %d\n", pqSchema.NumColumns())
	for i := 0; i < pqSchema.NumColumns(); i++ {
		fmt.Printf("  %s\n", pqSchema.Column(i).Name())
	}
}
Output:
parquet columns: 3
  name
  price
  qty

func (*Transcoder) Proto

func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message

Proto decodes selected rows from an Arrow RecordBatch back into protobuf messages. Pass nil for rows to decode all rows.

Example

ExampleTranscoder_Proto demonstrates round-tripping: appending protobuf messages, building an Arrow record, and reconstructing them back as protobuf messages.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	rec := tc.NewRecordBatch()
	defer rec.Release()

	msgs := tc.Proto(rec, nil) // nil = all rows
	fmt.Printf("recovered %d messages\n", len(msgs))
	for _, m := range msgs {
		js, _ := protojson.Marshal(m)
		var c bytes.Buffer
		json.Compact(&c, js)
		fmt.Println(c.String())
	}
}
Output:
recovered 2 messages
{"name":"Widget","price":9.99,"qty":5}
{"name":"Gadget","price":24.5,"qty":2}

func (*Transcoder) ReadParquet

func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)

ReadParquet reads the specified columns from Parquet source r and returns an Arrow RecordBatch. The returned RecordBatch must be released by the caller.

Example

ExampleTranscoder_ReadParquet demonstrates a full Parquet round-trip: write messages to Parquet, then read them back into an Arrow record.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	reader := bytes.NewReader(buf.Bytes())
	rec, err := tc.ReadParquet(context.Background(), reader, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer rec.Release()

	fmt.Printf("read back: %d rows, %d cols\n", rec.NumRows(), rec.NumCols())
}
Output:
read back: 2 rows, 3 cols

func (*Transcoder) Release

func (s *Transcoder) Release()

Release releases the reference on the underlying Arrow record builder.

Example

ExampleTranscoder_Release shows the standard cleanup pattern.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	// Release should always be deferred after construction.
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec := tc.NewRecordBatch()
	defer rec.Release()

	fmt.Printf("rows: %d\n", rec.NumRows())
}
Output:
rows: 1

func (*Transcoder) Schema

func (s *Transcoder) Schema() *arrow.Schema

Schema returns the Arrow arrow.Schema for the message.

Example

ExampleTranscoder_Schema demonstrates inspecting the Arrow schema derived from a protobuf message descriptor.

package main

import (
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	for i, f := range tc.Schema().Fields() {
		fmt.Printf("  %d: %-8s %s\n", i, f.Name, f.Type)
	}
}
Output:
  0: name     utf8
  1: price    float64
  2: qty      int32

func (*Transcoder) WriteParquet

func (s *Transcoder) WriteParquet(w io.Writer) error

WriteParquet writes the current record to Parquet format on w.

Example

ExampleTranscoder_WriteParquet demonstrates writing appended messages to Parquet and reading them back.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	tc.Append(newProduct(md, "Gadget", 24.50, 2))

	var buf bytes.Buffer
	if err := tc.WriteParquet(&buf); err != nil {
		log.Fatal(err)
	}

	fmt.Printf("parquet bytes: %d\n", buf.Len())
	fmt.Println("wrote parquet successfully")
}
Output:
parquet bytes: 714
wrote parquet successfully

func (*Transcoder) WriteParquetRecords

func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error

WriteParquetRecords writes one or many Arrow records to Parquet on w.

Example

ExampleTranscoder_WriteParquetRecords demonstrates writing multiple Arrow record batches to a single Parquet file.

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/loicalleyne/bufarrowlib"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

// buildSimpleDescriptor constructs an inline proto schema for examples:
//
//	message Product {
//	  string name  = 1;
//	  double price = 2;
//	  int32  qty   = 3;
//	}
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
	stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
	doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
	int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
	labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()

	fdp := &descriptorpb.FileDescriptorProto{
		Name:    proto.String("example_transcoder.proto"),
		Package: proto.String("example"),
		Syntax:  proto.String("proto3"),
		MessageType: []*descriptorpb.DescriptorProto{
			{
				Name: proto.String("Product"),
				Field: []*descriptorpb.FieldDescriptorProto{
					{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
					{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
					{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
				},
			},
		},
	}
	fd, err := protodesc.NewFile(fdp, nil)
	if err != nil {
		log.Fatalf("protodesc.NewFile: %v", err)
	}
	return fd.Messages().ByName("Product")
}

func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
	msg := dynamicpb.NewMessage(md)
	msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
	msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
	msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
	return msg
}

func main() {
	md := buildSimpleDescriptor()
	tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
	if err != nil {
		log.Fatal(err)
	}
	defer tc.Release()

	tc.Append(newProduct(md, "Widget", 9.99, 5))
	rec1 := tc.NewRecordBatch()
	defer rec1.Release()

	tc.Append(newProduct(md, "Gadget", 24.50, 2))
	rec2 := tc.NewRecordBatch()
	defer rec2.Release()

	var buf bytes.Buffer
	if err := tc.WriteParquetRecords(&buf, rec1, rec2); err != nil {
		log.Fatal(err)
	}

	fmt.Println("wrote 2 record batches to parquet")
}
Output:
wrote 2 record batches to parquet

Directories

Path Synopsis
gen
proto
pbpath
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL