Documentation
¶
Overview ¶
Example (DenormToParquet) ¶
Example_denormToParquet shows the complete workflow of denormalizing nested protobuf messages into a flat Arrow record and writing to Parquet.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Append an order with 2 items × 2 tags → 4 denormalized rows.
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.50}, {"B", 2.75}},
[]string{"rush", "fragile"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
// The denormalized record builder produces a flat schema.
fmt.Printf("schema: %v\n", tc.DenormalizerSchema().Field(0).Name)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("denorm rows: %d (2 items × 2 tags)\n", rec.NumRows())
// Write the flat denormalized record to Parquet via a new Transcoder
// built from the denorm schema, or write directly using pqarrow.
// Here we show the denorm data for inspection:
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
tag := rec.Column(3).(*array.String).Value(i)
fmt.Printf(" %s | %s | %.2f | %s\n", name, id, price, tag)
}
Output: schema: order_name denorm rows: 4 (2 items × 2 tags) order-1 | A | 1.50 | rush order-1 | A | 1.50 | fragile order-1 | B | 2.75 | rush order-1 | B | 2.75 | fragile
Example (ParquetRoundTrip) ¶
Example_parquetRoundTrip shows a full round-trip: proto → Arrow → Parquet → Arrow → proto. This demonstrates that data survives serialisation intact.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Encode two products.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
// Proto → Arrow → Parquet (in-memory buffer).
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
// Parquet → Arrow.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
// Arrow → Proto.
msgs := tc.Proto(rec, nil)
fmt.Printf("round-tripped %d messages:\n", len(msgs))
for _, m := range msgs {
js, _ := protojson.Marshal(m)
var c bytes.Buffer
json.Compact(&c, js)
fmt.Printf(" %s\n", c.String())
}
Output: round-tripped 2 messages: {"name":"Widget","price":9.99,"qty":5} {"name":"Gadget","price":24.5,"qty":2}
Example (ParquetSelectColumns) ¶
Example_parquetSelectColumns shows reading only specific columns from a Parquet file — useful for analytics queries that touch a subset of fields.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
// Read only column 0 (name) and column 2 (qty).
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, []int{0, 2})
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("cols: %d, rows: %d\n", rec.NumCols(), rec.NumRows())
names := rec.Column(0).(*array.String)
qtys := rec.Column(1).(*array.Int32)
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Printf(" %s: %d\n", names.Value(i), qtys.Value(i))
}
Output: cols: 2, rows: 2 Widget: 5 Gadget: 2
Example (ProtoFileToParquet) ¶
Example_protoFileToParquet shows the end-to-end workflow when working with .proto files on disk: compile the schema, create a transcoder, populate messages, and write Parquet.
// Write an inline .proto to a temp file.
_, dir := writeProtoFile("event.proto", `syntax = "proto3";
message Event {
string id = 1;
string action = 2;
int64 user_id = 3;
}
`)
defer os.RemoveAll(dir)
// Compile the .proto and look up the message descriptor.
fd, err := bufarrowlib.CompileProtoToFileDescriptor("event.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Event")
if err != nil {
log.Fatal(err)
}
// Create a Transcoder from the compiled descriptor.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Build and append messages using dynamicpb.
for _, e := range []struct {
id, action string
uid int64
}{
{"e1", "click", 100},
{"e2", "view", 200},
} {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("id"), protoreflect.ValueOfString(e.id))
msg.Set(md.Fields().ByName("action"), protoreflect.ValueOfString(e.action))
msg.Set(md.Fields().ByName("user_id"), protoreflect.ValueOfInt64(e.uid))
tc.Append(msg)
}
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
fmt.Printf("fields: %v\n", tc.FieldNames())
fmt.Printf("wrote parquet: %d bytes\n", buf.Len())
Output: fields: [id action user_id] wrote parquet: 688 bytes
Example (ProtoToParquetBatched) ¶
Example_protoToParquetBatched shows writing protobuf messages to Parquet in batches — useful when processing a large stream and flushing periodically.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Batch 1: first two messages.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
batch1 := tc.NewRecordBatch()
defer batch1.Release()
// Batch 2: one more message.
tc.Append(newProduct(md, "Gizmo", 7.25, 12))
batch2 := tc.NewRecordBatch()
defer batch2.Release()
// Write both batches to a single Parquet file.
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, batch1, batch2); err != nil {
log.Fatal(err)
}
// Read back and verify total row count.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("batches: 2, total rows read back: %d\n", rec.NumRows())
Output: batches: 2, total rows read back: 3
Example (ProtoToParquetFile) ¶
Example_protoToParquetFile shows the complete workflow of consuming protobuf messages and writing them to a Parquet file on disk.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Simulate consuming a stream of protobuf messages.
products := []struct {
name string
price float64
qty int32
}{
{"Widget", 9.99, 5},
{"Gadget", 24.50, 2},
{"Gizmo", 7.25, 12},
}
for _, p := range products {
tc.Append(newProduct(md, p.name, p.price, p.qty))
}
// Write to a Parquet file.
f, err := os.CreateTemp("", "example-*.parquet")
if err != nil {
log.Fatal(err)
}
name := f.Name()
defer os.Remove(name)
if err := tc.WriteParquet(f); err != nil {
log.Fatal(err)
}
f.Close()
info, _ := os.Stat(name)
fmt.Printf("wrote %d messages to parquet (%d bytes)\n", len(products), info.Size())
Output: wrote 3 messages to parquet (738 bytes)
Index ¶
- Variables
- func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)
- func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)
- func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)
- func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc
- func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType
- type Opt
- type Option
- type Transcoder
- func (s *Transcoder) Append(value proto.Message)
- func (s *Transcoder) AppendDenorm(msg proto.Message) error
- func (s *Transcoder) AppendWithCustom(value proto.Message, custom proto.Message) error
- func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)
- func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder
- func (s *Transcoder) DenormalizerSchema() *arrow.Schema
- func (s *Transcoder) FieldNames() []string
- func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch
- func (s *Transcoder) NewRecordBatch() arrow.RecordBatch
- func (s *Transcoder) Parquet() *schema.Schema
- func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message
- func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)
- func (s *Transcoder) Release()
- func (s *Transcoder) Schema() *arrow.Schema
- func (s *Transcoder) WriteParquet(w io.Writer) error
- func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error
Examples ¶
- Package (DenormToParquet)
- Package (ParquetRoundTrip)
- Package (ParquetSelectColumns)
- Package (ProtoFileToParquet)
- Package (ProtoToParquetBatched)
- Package (ProtoToParquetFile)
- CompileProtoToFileDescriptor
- GetMessageDescriptorByName
- MergeMessageDescriptors
- New
- New (WithCustomMessage)
- NewFromFile
- ProtoKindToAppendFunc
- ProtoKindToArrowType
- Transcoder.Append
- Transcoder.AppendDenorm
- Transcoder.AppendDenorm (CrossJoin)
- Transcoder.AppendDenorm (LeftJoin)
- Transcoder.AppendWithCustom
- Transcoder.Clone
- Transcoder.DenormalizerBuilder
- Transcoder.DenormalizerSchema
- Transcoder.FieldNames
- Transcoder.NewDenormalizerRecordBatch
- Transcoder.NewRecordBatch
- Transcoder.Parquet
- Transcoder.Proto
- Transcoder.ReadParquet
- Transcoder.Release
- Transcoder.Schema
- Transcoder.WriteParquet
- Transcoder.WriteParquetRecords
- WithCustomMessage
- WithCustomMessageFile
- WithDenormalizerPlan
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMxDepth is returned when the protobuf message nesting exceeds maxDepth. ErrMxDepth = errors.New("max depth reached, either the message is deeply nested or a circular dependency was introduced") // ErrPathNotFound is returned by node.getPath when a field name is not // found in the node's hash map. ErrPathNotFound = errors.New("path not found") )
Sentinel errors returned during schema construction and path lookup.
Functions ¶
func CompileProtoToFileDescriptor ¶
func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)
CompileProtoToFileDescriptor compiles a .proto file at runtime using protocompile and returns the resulting FileDescriptor. importPaths are the directories searched for transitive imports.
Example ¶
ExampleCompileProtoToFileDescriptor demonstrates compiling a .proto file from disk at runtime and inspecting its messages.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
_, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
for i := 0; i < fd.Messages().Len(); i++ {
m := fd.Messages().Get(i)
fmt.Printf("%s (%d fields)\n", m.Name(), m.Fields().Len())
}
}
Output: Item (2 fields)
func GetMessageDescriptorByName ¶
func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)
GetMessageDescriptorByName looks up a top-level message by name in the given FileDescriptor. Returns an error if the message is not found.
Example ¶
ExampleGetMessageDescriptorByName demonstrates looking up a specific message by name from a compiled FileDescriptor.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
_, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Item")
if err != nil {
log.Fatal(err)
}
for i := 0; i < md.Fields().Len(); i++ {
f := md.Fields().Get(i)
fmt.Printf("%s %s\n", f.Name(), f.Kind())
}
}
Output: name string price double
func MergeMessageDescriptors ¶
func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)
MergeMessageDescriptors merges two message descriptors into a new one with the specified name. It appends the fields from the second descriptor to the first, avoiding name conflicts. Field numbers from b are auto-renumbered starting after a's max field number to prevent wire-format collisions. Nested message types and enum types from b are also carried over. The resulting message descriptor is wrapped in a synthetic file descriptor to ensure it can be used independently.
Example ¶
ExampleMergeMessageDescriptors demonstrates merging two message descriptors into one with combined fields.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
merged, err := bufarrowlib.MergeMessageDescriptors(baseMD, customMD, "Merged")
if err != nil {
log.Fatal(err)
}
fields := merged.Fields()
for i := 0; i < fields.Len(); i++ {
fmt.Println(fields.Get(i).Name())
}
}
Output: name price qty region batch_id
func ProtoKindToAppendFunc ¶
func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc
ProtoKindToAppendFunc returns a closure that appends a protoreflect.Value of the appropriate kind to the given Arrow array builder. The builder must match the Arrow data type returned by ProtoKindToArrowType for the same field descriptor.
Returns nil if the field's kind is not a recognized scalar mapping.
Example ¶
ExampleProtoKindToAppendFunc demonstrates obtaining a typed append closure for a protobuf field and using it to populate an Arrow builder.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
nameFD := md.Fields().ByName("name")
dt := bufarrowlib.ProtoKindToArrowType(nameFD)
builder := array.NewBuilder(memory.DefaultAllocator, dt)
defer builder.Release()
appendFn := bufarrowlib.ProtoKindToAppendFunc(nameFD, builder)
appendFn(protoreflect.ValueOfString("hello"))
appendFn(protoreflect.ValueOfString("world"))
arr := builder.NewArray()
defer arr.Release()
fmt.Printf("len: %d\n", arr.Len())
fmt.Println(arr.(*array.String).Value(0))
fmt.Println(arr.(*array.String).Value(1))
}
Output: len: 2 hello world
func ProtoKindToArrowType ¶
func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType
ProtoKindToArrowType returns the Arrow data type corresponding to a protobuf field descriptor's scalar kind. In addition to primitive kinds (bool, int32, string, etc.), the following well-known and common message types are recognised and mapped to flat Arrow scalars:
- google.protobuf.Timestamp → Timestamp(ms, UTC)
- google.protobuf.Duration → Duration(ms)
- google.protobuf.FieldMask → String (comma-joined paths)
- google.protobuf.*Value → unwrapped scalar (BoolValue→Boolean, etc.)
- google.type.Date → Date32
- google.type.TimeOfDay → Time64(µs)
- google.type.Money → String (protojson)
- google.type.LatLng → String (protojson)
- google.type.Color → String (protojson)
- google.type.PostalAddress → String (protojson)
- google.type.Interval → String (protojson)
- opentelemetry AnyValue → Binary (proto-marshalled)
Returns nil if the field's kind or message type is not a recognized scalar mapping (e.g. a generic message, map, or group).
TODO: add a WithTimestampUnit(arrow.TimeUnit) option to allow callers to override the default Millisecond precision.
Example ¶
ExampleProtoKindToArrowType demonstrates mapping protobuf field kinds to Arrow data types.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
for i := 0; i < md.Fields().Len(); i++ {
fd := md.Fields().Get(i)
dt := bufarrowlib.ProtoKindToArrowType(fd)
fmt.Printf("%-8s → %s\n", fd.Name(), dt)
}
}
Output: name → utf8 price → float64 qty → int32
Types ¶
type Opt ¶
type Opt struct {
// contains filtered or unexported fields
}
Opt holds the option values collected from Option functions and passed to New or NewFromFile. Fields are unexported; use the With* helpers.
type Option ¶
type Option func(config)
Option is a functional option applied to New and NewFromFile to configure schema merging, denormalization, or other behaviours.
func WithCustomMessage ¶
func WithCustomMessage(msgDesc protoreflect.MessageDescriptor) Option
WithCustomMessage provides a protoreflect.MessageDescriptor whose fields will be merged into the base message schema. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessageFile.
Example ¶
ExampleWithCustomMessage demonstrates the WithCustomMessage option.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price qty region batch_id]
func WithCustomMessageFile ¶
WithCustomMessageFile specifies a .proto file and message name whose fields will be merged into the base message schema. The .proto file is compiled at schema creation time using protocompile. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessage.
Example ¶
ExampleWithCustomMessageFile demonstrates augmenting a base schema with custom fields loaded from a .proto file on disk.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
const exampleCustomProto = `syntax = "proto3";
message Extra {
string region = 1;
}
`
func main() {
basePath, baseDir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(baseDir)
customPath, customDir := writeProtoFile("extra.proto", exampleCustomProto)
defer os.RemoveAll(customDir)
tc, err := bufarrowlib.NewFromFile(
filepath.Base(basePath), "Item", []string{baseDir},
memory.DefaultAllocator,
bufarrowlib.WithCustomMessageFile(filepath.Base(customPath), "Extra", []string{customDir}),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price region]
func WithDenormalizerPlan ¶
func WithDenormalizerPlan(paths ...pbpath.PlanPathSpec) Option
WithDenormalizerPlan configures one or more protobuf field paths to project into a flat (denormalized) Arrow record. Each path is specified as a pbpath.PlanPathSpec created via pbpath.PlanPath, which supports per-path options such as pbpath.Alias and pbpath.StrictPath.
Paths that traverse repeated fields with a wildcard [*] or range [start:end] produce fan-out rows. Multiple independent fan-out groups are cross-joined; empty fan-out groups produce a single null row (left-join semantics).
Each leaf path must terminate at a scalar protobuf field or a recognized well-known message type (google.protobuf.Timestamp, otel AnyValue). Message-typed terminal nodes are rejected at schema creation time.
Example ¶
ExampleWithDenormalizerPlan demonstrates configuring a denormalization plan.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"X", 1.0}, {"Y", 2.0}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" %s | %s\n", name, id)
}
Output: rows: 2 order-1 | X order-1 | Y
type Transcoder ¶
type Transcoder struct {
// contains filtered or unexported fields
}
Transcoder converts protobuf messages to Apache Arrow record batches and back. It holds the compiled message schema, an Arrow record builder for the full message ("stencil"), and optionally a separate denormalizer that projects selected paths into a flat Arrow record suitable for analytics.
A Transcoder is not safe for concurrent use; call Transcoder.Clone to create independent copies for parallel goroutines.
func New ¶
func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)
New returns a new Transcoder from a pre-resolved message descriptor. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile. WithDenormalizerPlan creates a separate flat Arrow record for analytics whilst WithCustomMessage/WithCustomMessageFile expand the schema of the proto.MessageDescriptor used as input.
Example ¶
ExampleNew demonstrates creating a Transcoder, appending protobuf messages, and retrieving the result as an Arrow record batch.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output: rows: 2, cols: 3 fields: [name price qty]
Example (WithCustomMessage) ¶
ExampleNew_withCustomMessage demonstrates augmenting a protobuf schema with custom fields using WithCustomMessage, and populating both the base and custom fields via AppendWithCustom.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Printf("fields: %v\n", tc.FieldNames())
base := newProduct(baseMD, "Widget", 9.99, 5)
custom := dynamicpb.NewMessage(customMD)
custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("US-EAST"))
custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(42))
if err := tc.AppendWithCustom(base, custom); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: fields: [name price qty region batch_id] rows: 1, cols: 5
func NewFromFile ¶
func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, opts ...Option) (*Transcoder, error)
NewFromFile returns a new Transcoder by compiling a .proto file at runtime. protoFilePath is the path to the .proto file, messageName is the top-level message to use, and importPaths are the directories to search for imports. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile.
Example ¶
ExampleNewFromFile demonstrates creating a Transcoder directly from a .proto file on disk, without pre-compiling the descriptor yourself.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
path, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
tc, err := bufarrowlib.NewFromFile(filepath.Base(path), "Item", []string{dir}, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Populate a message using the schema's descriptor.
md := tc.Schema().Field(0) // just verify schema was built
fmt.Printf("field 0: %s\n", md.Name)
fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output: field 0: name fields: [name price]
func (*Transcoder) Append ¶
func (s *Transcoder) Append(value proto.Message)
Append appends a protobuf message to the transcoder's Arrow record builder. This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_Append shows the basic append-and-flush cycle.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Alpha", 1.0, 10))
tc.Append(newProduct(md, "Bravo", 2.0, 20))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
}
Output: rows: 2
func (*Transcoder) AppendDenorm ¶
func (s *Transcoder) AppendDenorm(msg proto.Message) error
AppendDenorm evaluates the denormalizer plan against msg and appends the resulting denormalized rows to the denormalizer's Arrow record builder.
Fan-out groups are cross-joined: each group contributes max(len(branches), 1) rows, and the total row count is the product of all group counts. Empty fan-out groups (no branches) contribute 1 null row (left-join semantics).
This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendDenorm demonstrates basic denormalization: projecting scalar and fan-out fields from a protobuf message into a flat Arrow record.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.50}, {"B", 2.75}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
fmt.Printf(" %s | %s | %.2f\n", name, id, price)
}
}
Output: rows: 2, cols: 3 order-1 | A | 1.50 order-1 | B | 2.75
Example (CrossJoin) ¶
ExampleTranscoder_AppendDenorm_crossJoin demonstrates cross-join behaviour when two independent repeated fields are denormalized together.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.0}, {"B", 2.0}},
[]string{"x", "y", "z"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d (2 items × 3 tags)\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
id := rec.Column(0).(*array.String).Value(i)
tag := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" %s | %s\n", id, tag)
}
}
Output: rows: 6 (2 items × 3 tags) A | x A | y A | z B | x B | y B | z
Example (LeftJoin) ¶
ExampleTranscoder_AppendDenorm_leftJoin demonstrates left-join semantics when a repeated field is empty: a single null row is produced for that group while the other group fans out normally.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Zero items, 2 tags → items group produces 1 null row → 1 × 2 = 2 rows
msg := newOrder(orderMD, itemMD, "order-1", nil, []string{"x", "y"}, 1)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
idNull := rec.Column(0).IsNull(i)
tag := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" item_id=null:%v | tag=%s\n", idNull, tag)
}
}
Output: rows: 2 item_id=null:true | tag=x item_id=null:true | tag=y
func (*Transcoder) AppendWithCustom ¶
AppendWithCustom appends a protobuf value merged with custom field values to the transcoder's builder. The custom proto.Message must conform to the message descriptor provided via WithCustomMessage or WithCustomMessageFile. Both messages are marshalled to bytes and unmarshalled into the merged stencil for appending. This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendWithCustom shows appending a message that includes both base and custom fields.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
base := newProduct(baseMD, "Widget", 9.99, 5)
custom := dynamicpb.NewMessage(customMD)
custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("EU"))
custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(7))
if err := tc.AppendWithCustom(base, custom); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: rows: 1, cols: 5
func (*Transcoder) Clone ¶
func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)
Clone returns an identical Transcoder. Use in concurrency scenarios as Transcoder methods are not concurrency safe.
Example ¶
ExampleTranscoder_Clone demonstrates cloning a Transcoder for use in a separate goroutine. The clone has independent builders but shares the same schema configuration.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
clone, err := tc.Clone(memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer clone.Release()
clone.Append(newProduct(md, "Gizmo", 5.00, 10))
rec := clone.NewRecordBatch()
defer rec.Release()
fmt.Printf("clone rows: %d\n", rec.NumRows())
origRec := tc.NewRecordBatch()
defer origRec.Release()
fmt.Printf("original rows: %d\n", origRec.NumRows())
}
Output: clone rows: 1 original rows: 0
func (*Transcoder) DenormalizerBuilder ¶
func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder
DenormalizerBuilder returns the denormalizer's Arrow array.RecordBuilder. This is exposed for callers who need to implement custom denormalization logic beyond what Transcoder.AppendDenorm provides. In most cases prefer AppendDenorm for automatic fan-out and cross-join handling. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_DenormalizerBuilder shows accessing the underlying RecordBuilder for custom denormalization logic.
orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
builder := tc.DenormalizerBuilder()
fmt.Printf("builder fields: %d\n", builder.Schema().NumFields())
fmt.Printf("schema: %s\n", builder.Schema().Field(0).Type)
Output: builder fields: 1 schema: utf8
func (*Transcoder) DenormalizerSchema ¶
func (s *Transcoder) DenormalizerSchema() *arrow.Schema
DenormalizerSchema returns the Arrow schema of the denormalized record. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_DenormalizerSchema demonstrates inspecting the Arrow schema of the denormalized record, showing column names, types, and nullability.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func main() {
orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("seq", pbpath.Alias("order_seq")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
schema := tc.DenormalizerSchema()
for i, f := range schema.Fields() {
fmt.Printf(" %d: %-12s %-10s nullable=%v\n", i, f.Name, f.Type, f.Nullable)
}
}
Output: 0: order_name utf8 nullable=true 1: order_seq int64 nullable=true 2: item_price float64 nullable=true
func (*Transcoder) FieldNames ¶
func (s *Transcoder) FieldNames() []string
FieldNames returns the top-level Arrow field names of the message schema.
Example ¶
ExampleTranscoder_FieldNames shows retrieving top-level field names.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price qty]
func (*Transcoder) NewDenormalizerRecordBatch ¶
func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch
NewDenormalizerRecordBatch returns the buffered denormalizer builder contents as an arrow.RecordBatch. The builder is reset and can be reused. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_NewDenormalizerRecordBatch shows flushing the denormalizer's builder into a record batch.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.0}, {"B", 2.0}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Println(rec.Column(0).(*array.String).Value(i))
}
Output: rows: 2 A B
func (*Transcoder) NewRecordBatch ¶
func (s *Transcoder) NewRecordBatch() arrow.RecordBatch
NewRecordBatch returns the buffered builder contents as an arrow.RecordBatch. The builder is reset and can be reused.
Example ¶
ExampleTranscoder_NewRecordBatch shows building an Arrow record batch from appended messages.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: rows: 1, cols: 3
func (*Transcoder) Parquet ¶
func (s *Transcoder) Parquet() *schema.Schema
Parquet returns the Parquet schema.Schema for the message.
Example ¶
ExampleTranscoder_Parquet demonstrates inspecting the Parquet schema.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
pqSchema := tc.Parquet()
fmt.Printf("parquet columns: %d\n", pqSchema.NumColumns())
for i := 0; i < pqSchema.NumColumns(); i++ {
fmt.Printf(" %s\n", pqSchema.Column(i).Name())
}
}
Output: parquet columns: 3 name price qty
func (*Transcoder) Proto ¶
func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message
Proto decodes selected rows from an Arrow RecordBatch back into protobuf messages. Pass nil for rows to decode all rows.
Example ¶
ExampleTranscoder_Proto demonstrates round-tripping: appending protobuf messages, building an Arrow record, and reconstructing them back as protobuf messages.
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec := tc.NewRecordBatch()
defer rec.Release()
msgs := tc.Proto(rec, nil) // nil = all rows
fmt.Printf("recovered %d messages\n", len(msgs))
for _, m := range msgs {
js, _ := protojson.Marshal(m)
var c bytes.Buffer
json.Compact(&c, js)
fmt.Println(c.String())
}
}
Output: recovered 2 messages {"name":"Widget","price":9.99,"qty":5} {"name":"Gadget","price":24.5,"qty":2}
func (*Transcoder) ReadParquet ¶
func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)
ReadParquet reads the specified columns from Parquet source r and returns an Arrow RecordBatch. The returned RecordBatch must be released by the caller.
Example ¶
ExampleTranscoder_ReadParquet demonstrates a full Parquet round-trip: write messages to Parquet, then read them back into an Arrow record.
package main
import (
"bytes"
"context"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("read back: %d rows, %d cols\n", rec.NumRows(), rec.NumCols())
}
Output: read back: 2 rows, 3 cols
func (*Transcoder) Release ¶
func (s *Transcoder) Release()
Release releases the reference on the underlying Arrow record builder.
Example ¶
ExampleTranscoder_Release shows the standard cleanup pattern.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
// Release should always be deferred after construction.
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
}
Output: rows: 1
func (*Transcoder) Schema ¶
func (s *Transcoder) Schema() *arrow.Schema
Schema returns the Arrow arrow.Schema for the message.
Example ¶
ExampleTranscoder_Schema demonstrates inspecting the Arrow schema derived from a protobuf message descriptor.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
for i, f := range tc.Schema().Fields() {
fmt.Printf(" %d: %-8s %s\n", i, f.Name, f.Type)
}
}
Output: 0: name utf8 1: price float64 2: qty int32
func (*Transcoder) WriteParquet ¶
func (s *Transcoder) WriteParquet(w io.Writer) error
WriteParquet writes the current record to Parquet format on w.
Example ¶
ExampleTranscoder_WriteParquet demonstrates writing appended messages to Parquet and reading them back.
package main
import (
"bytes"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
fmt.Printf("parquet bytes: %d\n", buf.Len())
fmt.Println("wrote parquet successfully")
}
Output: parquet bytes: 714 wrote parquet successfully
func (*Transcoder) WriteParquetRecords ¶
func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error
WriteParquetRecords writes one or many Arrow records to Parquet on w.
Example ¶
ExampleTranscoder_WriteParquetRecords demonstrates writing multiple Arrow record batches to a single Parquet file.
package main
import (
"bytes"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec1 := tc.NewRecordBatch()
defer rec1.Release()
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec2 := tc.NewRecordBatch()
defer rec2.Release()
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, rec1, rec2); err != nil {
log.Fatal(err)
}
fmt.Println("wrote 2 record batches to parquet")
}
Output: wrote 2 record batches to parquet
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
gen
|
|
|
proto
|
|
|
pbpath
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.
|
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values. |