Documentation
¶
Overview ¶
Package bufarrowlib converts protobuf messages to Apache Arrow record batches (and back), providing high-throughput ingestion pipelines and optional denormalization for analytics workloads.
Core types ¶
A Transcoder is the central type. It holds the compiled protobuf schema, an Arrow record builder for the full message, and an optional denormalizer that projects selected scalar paths into a flat Arrow record. Construct one with New (from a protoreflect.MessageDescriptor) or NewFromFile (from a .proto source file).
A HyperType wraps a compiled buf.build/go/hyperpb.MessageType and enables online profile-guided optimization (PGO). All Transcoder instances sharing a HyperType contribute profiling data, and a recompile atomically upgrades the parser for all of them.
Concurrency ¶
Transcoder methods are NOT safe for concurrent use. Call Transcoder.Clone to obtain independent copies for parallel goroutines. HyperType IS safe for concurrent use.
Hot-path guidance ¶
- Use Transcoder.AppendRaw / Transcoder.AppendDenormRaw with a HyperType for the fastest path from raw proto bytes to Arrow.
- Never call New or [Clone] inside a message-processing loop; construction is expensive (~300 µs). Pre-create workers and feed messages through channels.
- Always call defer rec.Release() on every arrow.RecordBatch returned by Transcoder.NewRecordBatch or Transcoder.NewDenormalizerRecordBatch.
Denormalization ¶
The denormalizer (configured via WithDenormalizerPlan) evaluates a set of protobuf field paths—potentially through repeated fields—and writes one flat Arrow record per cross-joined fan-out row. Configure it from a YAML file with NewTranscoderFromConfigFile or ParseDenormConfigFile + NewTranscoderFromConfig.
Example (DenormToParquet) ¶
Example_denormToParquet shows the complete workflow of denormalizing nested protobuf messages into a flat Arrow record and writing to Parquet.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Append an order with 2 items × 2 tags → 4 denormalized rows.
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.50}, {"B", 2.75}},
[]string{"rush", "fragile"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
// The denormalized record builder produces a flat schema.
fmt.Printf("schema: %v\n", tc.DenormalizerSchema().Field(0).Name)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("denorm rows: %d (2 items × 2 tags)\n", rec.NumRows())
// Write the flat denormalized record to Parquet via a new Transcoder
// built from the denorm schema, or write directly using pqarrow.
// Here we show the denorm data for inspection:
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
tag := rec.Column(3).(*array.String).Value(i)
fmt.Printf(" %s | %s | %.2f | %s\n", name, id, price, tag)
}
Output: schema: order_name denorm rows: 4 (2 items × 2 tags) order-1 | A | 1.50 | rush order-1 | A | 1.50 | fragile order-1 | B | 2.75 | rush order-1 | B | 2.75 | fragile
Example (ParquetRoundTrip) ¶
Example_parquetRoundTrip shows a full round-trip: proto → Arrow → Parquet → Arrow → proto. This demonstrates that data survives serialisation intact.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Encode two products.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
// Proto → Arrow → Parquet (in-memory buffer).
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
// Parquet → Arrow.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
// Arrow → Proto.
msgs := tc.Proto(rec, nil)
fmt.Printf("round-tripped %d messages:\n", len(msgs))
for _, m := range msgs {
js, _ := protojson.Marshal(m)
var c bytes.Buffer
json.Compact(&c, js)
fmt.Printf(" %s\n", c.String())
}
Output: round-tripped 2 messages: {"name":"Widget","price":9.99,"qty":5} {"name":"Gadget","price":24.5,"qty":2}
Example (ParquetSelectColumns) ¶
Example_parquetSelectColumns shows reading only specific columns from a Parquet file — useful for analytics queries that touch a subset of fields.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
// Read only column 0 (name) and column 2 (qty).
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, []int{0, 2})
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("cols: %d, rows: %d\n", rec.NumCols(), rec.NumRows())
names := rec.Column(0).(*array.String)
qtys := rec.Column(1).(*array.Int32)
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Printf(" %s: %d\n", names.Value(i), qtys.Value(i))
}
Output: cols: 2, rows: 2 Widget: 5 Gadget: 2
Example (ProtoFileToParquet) ¶
Example_protoFileToParquet shows the end-to-end workflow when working with .proto files on disk: compile the schema, create a transcoder, populate messages, and write Parquet.
// Write an inline .proto to a temp file.
_, dir := writeProtoFile("event.proto", `syntax = "proto3";
message Event {
string id = 1;
string action = 2;
int64 user_id = 3;
}
`)
defer os.RemoveAll(dir)
// Compile the .proto and look up the message descriptor.
fd, err := bufarrowlib.CompileProtoToFileDescriptor("event.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Event")
if err != nil {
log.Fatal(err)
}
// Create a Transcoder from the compiled descriptor.
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Build and append messages using dynamicpb.
for _, e := range []struct {
id, action string
uid int64
}{
{"e1", "click", 100},
{"e2", "view", 200},
} {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("id"), protoreflect.ValueOfString(e.id))
msg.Set(md.Fields().ByName("action"), protoreflect.ValueOfString(e.action))
msg.Set(md.Fields().ByName("user_id"), protoreflect.ValueOfInt64(e.uid))
tc.Append(msg)
}
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
fmt.Printf("fields: %v\n", tc.FieldNames())
fmt.Printf("wrote parquet: %d bytes\n", buf.Len())
Output: fields: [id action user_id] wrote parquet: 688 bytes
Example (ProtoToParquetBatched) ¶
Example_protoToParquetBatched shows writing protobuf messages to Parquet in batches — useful when processing a large stream and flushing periodically.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Batch 1: first two messages.
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
batch1 := tc.NewRecordBatch()
defer batch1.Release()
// Batch 2: one more message.
tc.Append(newProduct(md, "Gizmo", 7.25, 12))
batch2 := tc.NewRecordBatch()
defer batch2.Release()
// Write both batches to a single Parquet file.
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, batch1, batch2); err != nil {
log.Fatal(err)
}
// Read back and verify total row count.
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("batches: 2, total rows read back: %d\n", rec.NumRows())
Output: batches: 2, total rows read back: 3
Example (ProtoToParquetFile) ¶
Example_protoToParquetFile shows the complete workflow of consuming protobuf messages and writing them to a Parquet file on disk.
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Simulate consuming a stream of protobuf messages.
products := []struct {
name string
price float64
qty int32
}{
{"Widget", 9.99, 5},
{"Gadget", 24.50, 2},
{"Gizmo", 7.25, 12},
}
for _, p := range products {
tc.Append(newProduct(md, p.name, p.price, p.qty))
}
// Write to a Parquet file.
f, err := os.CreateTemp("", "example-*.parquet")
if err != nil {
log.Fatal(err)
}
name := f.Name()
defer os.Remove(name)
if err := tc.WriteParquet(f); err != nil {
log.Fatal(err)
}
f.Close()
info, _ := os.Stat(name)
fmt.Printf("wrote %d messages to parquet (%d bytes)\n", len(products), info.Size())
Output: wrote 3 messages to parquet (738 bytes)
Index ¶
- Variables
- func ColumnsToPlanSpecs(columns []ColumnDef) ([]pbpath.PlanPathSpec, error)
- func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)
- func ExprKindToAppendFunc(kind protoreflect.Kind, b array.Builder) protoAppendFunc
- func ExprKindToArrowType(kind protoreflect.Kind) arrow.DataType
- func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)
- func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)
- func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc
- func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType
- type ArgDef
- type ColumnDef
- type CustomMsgConfig
- type DenormConfig
- type DenormPlanConfig
- type ExprDef
- type HyperType
- type HyperTypeOption
- type Opt
- type Option
- type ProtoConfig
- type Transcoder
- func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)
- func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, ...) (*Transcoder, error)
- func NewTranscoderFromConfig(cfg *DenormConfig, mem memory.Allocator) (*Transcoder, error)
- func NewTranscoderFromConfigFile(configPath string, mem memory.Allocator) (*Transcoder, error)
- func (s *Transcoder) Append(value proto.Message)
- func (s *Transcoder) AppendDenorm(msg proto.Message) error
- func (s *Transcoder) AppendDenormRaw(data []byte) error
- func (s *Transcoder) AppendDenormRawMerged(baseBytes, customBytes []byte) error
- func (s *Transcoder) AppendRaw(data []byte) error
- func (s *Transcoder) AppendRawMerged(baseBytes, customBytes []byte) error
- func (s *Transcoder) AppendWithCustom(value proto.Message, custom proto.Message) error
- func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)
- func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder
- func (s *Transcoder) DenormalizerSchema() *arrow.Schema
- func (s *Transcoder) FieldNames() []string
- func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch
- func (s *Transcoder) NewRecordBatch() arrow.RecordBatch
- func (s *Transcoder) Parquet() *schema.Schema
- func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message
- func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)
- func (s *Transcoder) Release()
- func (s *Transcoder) Schema() *arrow.Schema
- func (s *Transcoder) WriteParquet(w io.Writer) error
- func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error
Examples ¶
- Package (DenormToParquet)
- Package (ParquetRoundTrip)
- Package (ParquetSelectColumns)
- Package (ProtoFileToParquet)
- Package (ProtoToParquetBatched)
- Package (ProtoToParquetFile)
- CompileProtoToFileDescriptor
- GetMessageDescriptorByName
- HyperType.Recompile
- MergeMessageDescriptors
- New
- New (WithCustomMessage)
- NewFromFile
- NewHyperType
- NewHyperType (WithAutoRecompile)
- NewTranscoderFromConfig
- ParseDenormConfig
- ParseDenormConfig (NestedExpr)
- ProtoKindToAppendFunc
- ProtoKindToArrowType
- Transcoder.Append
- Transcoder.AppendDenorm
- Transcoder.AppendDenorm (Coalesce)
- Transcoder.AppendDenorm (ComposedExpr)
- Transcoder.AppendDenorm (Concat)
- Transcoder.AppendDenorm (CrossJoin)
- Transcoder.AppendDenorm (Default)
- Transcoder.AppendDenorm (Has)
- Transcoder.AppendDenorm (LeftJoin)
- Transcoder.AppendDenorm (Upper)
- Transcoder.AppendDenorm (WithExprAndFanout)
- Transcoder.AppendDenormRaw
- Transcoder.AppendDenormRaw (Batch)
- Transcoder.AppendRaw
- Transcoder.AppendWithCustom
- Transcoder.Clone
- Transcoder.Clone (WithHyperType)
- Transcoder.DenormalizerBuilder
- Transcoder.DenormalizerSchema
- Transcoder.FieldNames
- Transcoder.NewDenormalizerRecordBatch
- Transcoder.NewRecordBatch
- Transcoder.Parquet
- Transcoder.Proto
- Transcoder.ReadParquet
- Transcoder.Release
- Transcoder.Schema
- Transcoder.WriteParquet
- Transcoder.WriteParquetRecords
- WithCustomMessage
- WithCustomMessageFile
- WithDenormalizerPlan
- WithHyperType
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMxDepth is returned when the protobuf message nesting exceeds maxDepth. ErrMxDepth = errors.New("max depth reached, either the message is deeply nested or a circular dependency was introduced") // ErrPathNotFound is returned by node.getPath when a field name is not // found in the node's hash map. ErrPathNotFound = errors.New("path not found") )
Sentinel errors returned during schema construction and path lookup.
Functions ¶
func ColumnsToPlanSpecs ¶ added in v0.3.0
func ColumnsToPlanSpecs(columns []ColumnDef) ([]pbpath.PlanPathSpec, error)
ColumnsToPlanSpecs converts a slice of ColumnDef into []pbpath.PlanPathSpec ready for WithDenormalizerPlan.
func CompileProtoToFileDescriptor ¶
func CompileProtoToFileDescriptor(protoFilePath string, importPaths []string) (protoreflect.FileDescriptor, error)
CompileProtoToFileDescriptor compiles a .proto file at runtime using protocompile and returns the resulting FileDescriptor. importPaths are the directories searched for transitive imports.
Example ¶
ExampleCompileProtoToFileDescriptor demonstrates compiling a .proto file from disk at runtime and inspecting its messages.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
_, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
for i := 0; i < fd.Messages().Len(); i++ {
m := fd.Messages().Get(i)
fmt.Printf("%s (%d fields)\n", m.Name(), m.Fields().Len())
}
}
Output: Item (2 fields)
func ExprKindToAppendFunc ¶ added in v0.2.0
func ExprKindToAppendFunc(kind protoreflect.Kind, b array.Builder) protoAppendFunc
ExprKindToAppendFunc returns a closure that appends a protoreflect.Value of the given kind to the Arrow array builder b. The builder must match the type returned by ExprKindToArrowType for the same kind.
This is the Expr-output counterpart of ProtoKindToAppendFunc; it handles only the primitive scalar kinds that Expr functions can produce. Returns nil for unsupported kinds.
func ExprKindToArrowType ¶ added in v0.2.0
func ExprKindToArrowType(kind protoreflect.Kind) arrow.DataType
ExprKindToArrowType returns the Arrow data type corresponding to a protoreflect.Kind. This is used for denormalizer columns whose type is determined by an [Expr] function's output kind rather than by a leaf field descriptor.
Only the primitive scalar kinds that Expr functions can produce are handled:
BoolKind → Boolean Int32Kind → Int32 Int64Kind → Int64 Uint32Kind → Uint32 Uint64Kind → Uint64 FloatKind → Float32 DoubleKind → Float64 StringKind → String BytesKind → Binary EnumKind → Int32
Returns nil for message, group, or unrecognised kinds.
func GetMessageDescriptorByName ¶
func GetMessageDescriptorByName(fd protoreflect.FileDescriptor, messageName string) (protoreflect.MessageDescriptor, error)
GetMessageDescriptorByName looks up a top-level message by name in the given FileDescriptor. Returns an error if the message is not found.
Example ¶
ExampleGetMessageDescriptorByName demonstrates looking up a specific message by name from a compiled FileDescriptor.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
_, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
fd, err := bufarrowlib.CompileProtoToFileDescriptor("item.proto", []string{dir})
if err != nil {
log.Fatal(err)
}
md, err := bufarrowlib.GetMessageDescriptorByName(fd, "Item")
if err != nil {
log.Fatal(err)
}
for i := 0; i < md.Fields().Len(); i++ {
f := md.Fields().Get(i)
fmt.Printf("%s %s\n", f.Name(), f.Kind())
}
}
Output: name string price double
func MergeMessageDescriptors ¶
func MergeMessageDescriptors(a, b protoreflect.MessageDescriptor, newName string) (protoreflect.MessageDescriptor, error)
MergeMessageDescriptors merges two message descriptors into a new one with the specified name. It appends the fields from the second descriptor to the first, avoiding name conflicts. Field numbers from b are auto-renumbered starting after a's max field number to prevent wire-format collisions. Nested message types and enum types from b are also carried over. The resulting message descriptor is wrapped in a synthetic file descriptor to ensure it can be used independently.
Example ¶
ExampleMergeMessageDescriptors demonstrates merging two message descriptors into one with combined fields.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
merged, err := bufarrowlib.MergeMessageDescriptors(baseMD, customMD, "Merged")
if err != nil {
log.Fatal(err)
}
fields := merged.Fields()
for i := 0; i < fields.Len(); i++ {
fmt.Println(fields.Get(i).Name())
}
}
Output: name price qty region batch_id
func ProtoKindToAppendFunc ¶
func ProtoKindToAppendFunc(fd protoreflect.FieldDescriptor, b array.Builder) protoAppendFunc
ProtoKindToAppendFunc returns a closure that appends a protoreflect.Value of the appropriate kind to the given Arrow array builder. The builder must match the Arrow data type returned by ProtoKindToArrowType for the same field descriptor.
Returns nil if the field's kind is not a recognized scalar mapping.
Example ¶
ExampleProtoKindToAppendFunc demonstrates obtaining a typed append closure for a protobuf field and using it to populate an Arrow builder.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
nameFD := md.Fields().ByName("name")
dt := bufarrowlib.ProtoKindToArrowType(nameFD)
builder := array.NewBuilder(memory.DefaultAllocator, dt)
defer builder.Release()
appendFn := bufarrowlib.ProtoKindToAppendFunc(nameFD, builder)
appendFn(protoreflect.ValueOfString("hello"))
appendFn(protoreflect.ValueOfString("world"))
arr := builder.NewArray()
defer arr.Release()
fmt.Printf("len: %d\n", arr.Len())
fmt.Println(arr.(*array.String).Value(0))
fmt.Println(arr.(*array.String).Value(1))
}
Output: len: 2 hello world
func ProtoKindToArrowType ¶
func ProtoKindToArrowType(fd protoreflect.FieldDescriptor) arrow.DataType
ProtoKindToArrowType returns the Arrow data type corresponding to a protobuf field descriptor's scalar kind. In addition to primitive kinds (bool, int32, string, etc.), the following well-known and common message types are recognised and mapped to flat Arrow scalars:
- google.protobuf.Timestamp → Timestamp(ms, UTC)
- google.protobuf.Duration → Duration(ms)
- google.protobuf.FieldMask → String (comma-joined paths)
- google.protobuf.*Value → unwrapped scalar (BoolValue→Boolean, etc.)
- google.type.Date → Date32
- google.type.TimeOfDay → Time64(µs)
- google.type.Money → String (protojson)
- google.type.LatLng → String (protojson)
- google.type.Color → String (protojson)
- google.type.PostalAddress → String (protojson)
- google.type.Interval → String (protojson)
- opentelemetry AnyValue → Binary (proto-marshalled)
Returns nil if the field's kind or message type is not a recognized scalar mapping (e.g. a generic message, map, or group).
TODO: add a WithTimestampUnit(arrow.TimeUnit) option to allow callers to override the default Millisecond precision.
Example ¶
ExampleProtoKindToArrowType demonstrates mapping protobuf field kinds to Arrow data types.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
for i := 0; i < md.Fields().Len(); i++ {
fd := md.Fields().Get(i)
dt := bufarrowlib.ProtoKindToArrowType(fd)
fmt.Printf("%-8s → %s\n", fd.Name(), dt)
}
}
Output: name → utf8 price → float64 qty → int32
Types ¶
type ArgDef ¶ added in v0.3.0
type ArgDef struct {
Path string `yaml:"path,omitempty"`
Literal any `yaml:"literal,omitempty"`
Expr *ExprDef `yaml:"expr,omitempty"`
}
ArgDef is one argument in an ExprDef. Exactly one field should be set:
- Path: a protobuf field path (leaf pbpath.PathRef).
- Literal: a scalar constant (string, int, float64, or bool).
- Expr: a nested expression sub-tree.
type ColumnDef ¶ added in v0.3.0
type ColumnDef struct {
// Name is the output Arrow column name.
Name string `yaml:"name"`
// Path is a pbpath path string used when no Expr is needed.
Path string `yaml:"path,omitempty"`
// Strict makes out-of-bounds range/index access return an error instead
// of being silently clamped. Only meaningful for path-based columns.
Strict bool `yaml:"strict,omitempty"`
// Expr defines a computed value from one or more source paths.
Expr *ExprDef `yaml:"expr,omitempty"`
}
ColumnDef defines one output Arrow column in the denormalizer.
Exactly one of Path or Expr must be set:
- Path: a raw protobuf path string (e.g. "imp[*].bidfloor"). The column name in the output schema is taken from Name.
- Expr: a computed expression tree. All source paths come from the expression's leaf ArgDef entries; the Name field becomes the output column alias.
Strict only applies to path-based columns; it is ignored for expr columns.
type CustomMsgConfig ¶ added in v0.3.0
type CustomMsgConfig struct {
File string `yaml:"file"`
Message string `yaml:"message"`
ImportPaths []string `yaml:"import_paths,omitempty"`
}
CustomMsgConfig optionally merges fields from a second .proto message into the base schema, enabling Transcoder.AppendWithCustom.
type DenormConfig ¶ added in v0.3.0
type DenormConfig struct {
Proto ProtoConfig `yaml:"proto"`
CustomMessage *CustomMsgConfig `yaml:"custom_message,omitempty"`
Denormalizer DenormPlanConfig `yaml:"denormalizer"`
}
DenormConfig is the top-level declarative configuration for building a Transcoder with a denormalizer plan from a YAML source.
Example YAML:
proto:
file: path/to/schema.proto
message: BidRequestEvent
import_paths:
- ./proto
# optional: merge an additional message's fields into the schema
custom_message:
file: path/to/extensions.proto
message: BidRequestExtension
denormalizer:
columns:
- name: auction_id
path: auction_id
- name: imp_id
path: imp[*].id
- name: floor_price
path: imp[*].bidfloor
strict: true
- name: has_video
expr:
func: has
args:
- path: imp[*].video.id
- name: full_imp_id
expr:
func: concat
sep: "-"
args:
- path: imp[*].id
- path: imp[*].banner.id
- name: region
expr:
func: default
args:
- path: user.geo.region
literal: "unknown"
func ParseDenormConfig ¶ added in v0.3.0
func ParseDenormConfig(r io.Reader) (*DenormConfig, error)
ParseDenormConfig decodes a YAML DenormConfig from r. Unknown fields are rejected to surface typos early.
Example ¶
ExampleParseDenormConfig demonstrates parsing a YAML denormalizer configuration from an strings.Reader and inspecting the result.
package main
import (
"fmt"
"log"
"strings"
"github.com/loicalleyne/bufarrowlib"
)
func main() {
src := `
proto:
file: schema.proto
message: Order
denormalizer:
columns:
- name: order_name
path: name
- name: item_id
path: items[*].id
- name: item_price
path: items[*].price
- name: has_tags
expr:
func: has
args:
- path: tags[*]
`
cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
if err != nil {
log.Fatal(err)
}
fmt.Println("proto:", cfg.Proto.Message)
fmt.Println("columns:", len(cfg.Denormalizer.Columns))
for _, col := range cfg.Denormalizer.Columns {
if col.Expr != nil {
fmt.Printf(" %-18s expr:%s\n", col.Name, col.Expr.Func)
} else {
fmt.Printf(" %-18s path:%s\n", col.Name, col.Path)
}
}
}
Output: proto: Order columns: 4 order_name path:name item_id path:items[*].id item_price path:items[*].price has_tags expr:has
Example (NestedExpr) ¶
ExampleParseDenormConfig_nestedExpr shows a cond expression whose predicate is itself a nested expression (recursive bufarrowlib.ExprDef trees).
package main
import (
"fmt"
"log"
"strings"
"github.com/loicalleyne/bufarrowlib"
)
func main() {
src := `
proto:
file: schema.proto
message: Order
denormalizer:
columns:
- name: category
expr:
func: cond
args:
- expr:
func: gt
args:
- path: seq
- literal: 100
- literal: "premium"
- literal: "standard"
`
cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
if err != nil {
log.Fatal(err)
}
col := cfg.Denormalizer.Columns[0]
fmt.Println("func:", col.Expr.Func)
fmt.Println("predicate:", col.Expr.Args[0].Expr.Func)
fmt.Println("then:", col.Expr.Args[1].Literal)
fmt.Println("else:", col.Expr.Args[2].Literal)
}
Output: func: cond predicate: gt then: premium else: standard
func ParseDenormConfigFile ¶ added in v0.3.0
func ParseDenormConfigFile(path string) (*DenormConfig, error)
ParseDenormConfigFile reads and parses a YAML DenormConfig from a file.
type DenormPlanConfig ¶ added in v0.3.0
type DenormPlanConfig struct {
Columns []ColumnDef `yaml:"columns"`
}
DenormPlanConfig holds the ordered list of output columns for the denormalizer plan.
type ExprDef ¶ added in v0.3.0
type ExprDef struct {
Func string `yaml:"func"`
Args []ArgDef `yaml:"args,omitempty"`
Sep string `yaml:"sep,omitempty"`
Literal any `yaml:"literal,omitempty"`
Literal2 any `yaml:"literal2,omitempty"`
Param int `yaml:"param,omitempty"`
}
ExprDef describes a single node in a composable expression tree.
Supported func names ¶
┌──────────────┬──────────────────────────────────────────────────────────────────────┐ │ Category │ func values │ ├──────────────┼──────────────────────────────────────────────────────────────────────┤ │ Aggregation │ coalesce, default │ │ Control flow │ cond │ │ Predicates │ has, eq, ne, lt, le, gt, ge │ │ Arithmetic │ add, sub, mul, div, mod, abs, ceil, floor, round, min, max │ │ String │ concat, upper, lower, trim, trim_prefix, trim_suffix, len │ │ Cast │ cast_int, cast_float, cast_string │ │ Timestamp │ age, strptime, try_strptime, extract_year, extract_month, │ │ │ extract_day, extract_hour, extract_minute, extract_second │ │ ETL │ hash, epoch_to_date, date_part, bucket, mask, coerce, enum_name, │ │ │ sum, distinct, list_concat │ │ Logic │ and, or, not │ └──────────────┴──────────────────────────────────────────────────────────────────────┘
Auxiliary fields ¶
- Args — ordered list of child ArgDef values.
- Sep — string parameter, interpretation depends on func:
- concat: separator string (e.g. "-", ",").
- trim_prefix, trim_suffix: the affix string to remove.
- strptime, try_strptime: the Go time-format string (e.g. "2006-01-02T15:04:05Z").
- date_part: part name ("year", "month", "day", "hour", "minute", "second", "epoch").
- list_concat: separator between collected values.
- mask: the replacement character (single rune, e.g. "*").
- Literal — first scalar constant:
- default: the fallback value when the child is null/zero.
- coerce: the ifTrue replacement value.
- YAML type determines Go type: string → string, integer → int64, float → float64, bool → bool.
- Literal2 — second scalar constant:
- coerce: the ifFalse replacement value (same type rules as Literal).
- Param — integer parameter:
- bucket: bucket size (the child value is divided into buckets of this width).
- mask: number of leading characters to keep unmasked (keepFirst); the number of trailing characters to keep is always 0 (use a cond+mask tree for both).
type HyperType ¶ added in v0.2.0
type HyperType struct {
// contains filtered or unexported fields
}
HyperType is a shared coordinator for a compiled hyperpb.MessageType that supports lock-free sharing across multiple Transcoder instances (including clones running in separate goroutines). It provides online profile-guided optimization (PGO) via hyperpb.Profile: all Transcoders using the same HyperType contribute profiling data, and a recompile atomically upgrades the parser for all of them.
A HyperType is safe for concurrent use. The underlying hyperpb.MessageType is immutable after compilation; HyperType.Recompile atomically swaps it.
func NewHyperType ¶ added in v0.2.0
func NewHyperType(md protoreflect.MessageDescriptor, opts ...HyperTypeOption) *HyperType
NewHyperType compiles a hyperpb.MessageType from md and returns a shared coordinator ready for use by one or more Transcoder instances. The compiled type can later be recompiled with profiling data for improved parse performance.
Example ¶
ExampleNewHyperType demonstrates creating a HyperType coordinator for high-performance raw-bytes ingestion. HyperType compiles a hyperpb parser from a message descriptor and can be shared across multiple Transcoders.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func main() {
outerMD, _ := buildHyperExampleDescriptors()
// Create a HyperType — this compiles the hyperpb parser once.
ht := bufarrowlib.NewHyperType(outerMD)
// The compiled type is accessible and can be inspected.
fmt.Printf("type: %v\n", ht.Type() != nil)
fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output: type: true sample rate: 0.01
Example (WithAutoRecompile) ¶
ExampleNewHyperType_withAutoRecompile demonstrates enabling automatic profile-guided recompilation. After the threshold number of messages, the parser is recompiled using collected profiling data.
package main
import (
"fmt"
"log"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func main() {
outerMD, _ := buildHyperExampleDescriptors()
// Recompile every 10,000 messages, sampling 5% of them for profiling.
ht := bufarrowlib.NewHyperType(outerMD,
bufarrowlib.WithAutoRecompile(10_000, 0.05),
)
fmt.Printf("type: %v\n", ht.Type() != nil)
fmt.Printf("sample rate: %.2f\n", ht.SampleRate())
}
Output: type: true sample rate: 0.05
func (*HyperType) Profile ¶ added in v0.2.0
Profile returns the current hyperpb.Profile for recording parse statistics. Returns nil if profiling has not been initialized.
func (*HyperType) Recompile ¶ added in v0.2.0
Recompile recompiles the underlying hyperpb.MessageType using the collected profiling data. It atomically swaps the old profile for a fresh one (preventing double-recompile races), recompiles, and stores the new type. All Transcoder instances sharing this HyperType will pick up the new type on their next Transcoder.AppendRaw or Transcoder.AppendDenormRaw call.
Recompile is safe for concurrent use but is intentionally synchronous: the caller blocks until compilation finishes. For non-blocking recompile, wrap the call in a goroutine.
Returns an error if no profile data has been collected.
Example ¶
ExampleHyperType_Recompile demonstrates manual profile-guided recompilation. After processing a batch of messages, the profiling data is used to recompile the parser for better performance on subsequent batches.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
// Manual recompile mode: threshold=0 disables auto-recompile,
// rate=1.0 profiles 100% of messages for maximum accuracy.
ht := bufarrowlib.NewHyperType(outerMD,
bufarrowlib.WithAutoRecompile(0, 1.0),
)
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("product")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Phase 1: Profile a representative batch.
for i := 0; i < 100; i++ {
msg := newOuterMessage(outerMD, innerMD,
fmt.Sprintf("product-%d", i), "",
[]struct {
id string
price float64
}{
{fmt.Sprintf("item-%d", i), float64(i) + 0.99},
}, int64(i),
)
raw, _ := proto.Marshal(msg)
tc.AppendDenormRaw(raw)
}
rec := tc.NewDenormalizerRecordBatch()
rec.Release()
// Phase 2: Recompile with the collected profile.
if err := ht.Recompile(); err != nil {
log.Fatal(err)
}
fmt.Println("recompiled successfully")
// Phase 3: Process more data with the optimised parser.
msg := newOuterMessage(outerMD, innerMD, "final", "", []struct {
id string
price float64
}{{"Z", 99.99}}, 1)
raw, _ := proto.Marshal(msg)
tc.AppendDenormRaw(raw)
rec2 := tc.NewDenormalizerRecordBatch()
defer rec2.Release()
fmt.Printf("rows after recompile: %d\n", rec2.NumRows())
}
Output: recompiled successfully rows after recompile: 1
func (*HyperType) RecompileAsync ¶ added in v0.2.0
func (h *HyperType) RecompileAsync() <-chan struct{}
RecompileAsync spawns a goroutine to recompile asynchronously. The returned channel is closed when recompilation completes (or is skipped because another recompile is in progress). Errors are silently discarded; use [Recompile] directly if error handling is needed.
func (*HyperType) RecordMessage ¶ added in v0.2.0
RecordMessage increments the message counter and returns true if the auto-recompile threshold has been reached and the caller should trigger HyperType.Recompile. Returns false if auto-recompile is disabled (threshold == 0) or the threshold has not been reached.
func (*HyperType) SampleRate ¶ added in v0.2.0
SampleRate returns the profiling sample rate.
func (*HyperType) Type ¶ added in v0.2.0
func (h *HyperType) Type() *hyperpb.MessageType
Type returns the current compiled hyperpb.MessageType. The returned pointer is safe to use until the next HyperType.Recompile call; callers should load it once per batch rather than caching it long-term.
type HyperTypeOption ¶ added in v0.2.0
type HyperTypeOption func(*hyperTypeConfig)
HyperTypeOption configures a HyperType during construction.
func WithAutoRecompile ¶ added in v0.2.0
func WithAutoRecompile(threshold int64, rate float64) HyperTypeOption
WithAutoRecompile enables automatic recompilation after threshold messages have been profiled. rate is the sampling fraction passed to hyperpb.WithRecordProfile (e.g. 0.01 for 1%). A threshold of 0 disables auto-recompile (the default); use HyperType.Recompile manually instead.
type Opt ¶
type Opt struct {
// contains filtered or unexported fields
}
Opt holds the option values collected from Option functions and passed to New or NewFromFile. Fields are unexported; use the With* helpers.
type Option ¶
type Option func(config)
Option is a functional option applied to New and NewFromFile to configure schema merging, denormalization, or other behaviours.
func WithCustomMessage ¶
func WithCustomMessage(msgDesc protoreflect.MessageDescriptor) Option
WithCustomMessage provides a protoreflect.MessageDescriptor whose fields will be merged into the base message schema. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessageFile.
Example ¶
ExampleWithCustomMessage demonstrates the WithCustomMessage option.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price qty region batch_id]
func WithCustomMessageFile ¶
WithCustomMessageFile specifies a .proto file and message name whose fields will be merged into the base message schema. The .proto file is compiled at schema creation time using protocompile. The merged schema can be populated with Transcoder.AppendWithCustom. This option is mutually exclusive with WithCustomMessage.
Example ¶
ExampleWithCustomMessageFile demonstrates augmenting a base schema with custom fields loaded from a .proto file on disk.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
const exampleCustomProto = `syntax = "proto3";
message Extra {
string region = 1;
}
`
func main() {
basePath, baseDir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(baseDir)
customPath, customDir := writeProtoFile("extra.proto", exampleCustomProto)
defer os.RemoveAll(customDir)
tc, err := bufarrowlib.NewFromFile(
filepath.Base(basePath), "Item", []string{baseDir},
memory.DefaultAllocator,
bufarrowlib.WithCustomMessageFile(filepath.Base(customPath), "Extra", []string{customDir}),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price region]
func WithDenormalizerPlan ¶
func WithDenormalizerPlan(paths ...pbpath.PlanPathSpec) Option
WithDenormalizerPlan configures one or more protobuf field paths to project into a flat (denormalized) Arrow record. Each path is specified as a pbpath.PlanPathSpec created via pbpath.PlanPath, which supports per-path options such as pbpath.Alias and pbpath.StrictPath.
Paths that traverse repeated fields with a wildcard [*] or range [start:end] produce fan-out rows. Multiple independent fan-out groups are cross-joined; empty fan-out groups produce a single null row (left-join semantics).
Each leaf path must terminate at a scalar protobuf field or a recognized well-known message type (google.protobuf.Timestamp, otel AnyValue). Message-typed terminal nodes are rejected at schema creation time.
Example ¶
ExampleWithDenormalizerPlan demonstrates configuring a denormalization plan.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"X", 1.0}, {"Y", 2.0}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" %s | %s\n", name, id)
}
Output: rows: 2 order-1 | X order-1 | Y
func WithHyperType ¶ added in v0.2.0
WithHyperType provides a shared HyperType coordinator for PGO-enabled raw-bytes ingestion via Transcoder.AppendRaw and Transcoder.AppendDenormRaw. The HyperType's compiled hyperpb.MessageType is used instead of compiling a new one, and all Transcoders sharing the same HyperType contribute profiling data for online recompilation.
Create a HyperType with NewHyperType and pass it to multiple New/Clone calls. Call HyperType.Recompile to upgrade the parser with collected profile data.
Example ¶
ExampleWithHyperType demonstrates the WithHyperType option, which connects a Transcoder to a shared HyperType coordinator for raw-bytes ingestion.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
ht := bufarrowlib.NewHyperType(outerMD)
// WithHyperType enables AppendRaw and AppendDenormRaw on the transcoder.
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOuterMessage(outerMD, innerMD, "Test", "", nil, 42)
raw, _ := proto.Marshal(msg)
// AppendRaw is now available because WithHyperType was provided.
if err := tc.AppendRaw(raw); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
}
Output: rows: 1
type ProtoConfig ¶ added in v0.3.0
type ProtoConfig struct {
// File is the path to the .proto file.
File string `yaml:"file"`
// Message is the top-level message name to use as the Transcoder root.
Message string `yaml:"message"`
// ImportPaths are additional directories searched when resolving imports
// within the .proto file.
ImportPaths []string `yaml:"import_paths,omitempty"`
}
ProtoConfig identifies the .proto source file and the root message type.
type Transcoder ¶
type Transcoder struct {
// contains filtered or unexported fields
}
Transcoder converts protobuf messages to Apache Arrow record batches and back. It holds the compiled message schema, an Arrow record builder for the full message ("stencil"), and optionally a separate denormalizer that projects selected paths into a flat Arrow record suitable for analytics.
A Transcoder is not safe for concurrent use; call Transcoder.Clone to create independent copies for parallel goroutines.
func New ¶
func New(msgDesc protoreflect.MessageDescriptor, mem memory.Allocator, opts ...Option) (tc *Transcoder, err error)
New returns a new Transcoder from a pre-resolved message descriptor. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile. WithDenormalizerPlan creates a separate flat Arrow record for analytics whilst WithCustomMessage/WithCustomMessageFile expand the schema of the proto.MessageDescriptor used as input.
Example ¶
ExampleNew demonstrates creating a Transcoder, appending protobuf messages, and retrieving the result as an Arrow record batch.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output: rows: 2, cols: 3 fields: [name price qty]
Example (WithCustomMessage) ¶
ExampleNew_withCustomMessage demonstrates augmenting a protobuf schema with custom fields using WithCustomMessage, and populating both the base and custom fields via AppendWithCustom.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Printf("fields: %v\n", tc.FieldNames())
base := newProduct(baseMD, "Widget", 9.99, 5)
custom := dynamicpb.NewMessage(customMD)
custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("US-EAST"))
custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(42))
if err := tc.AppendWithCustom(base, custom); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: fields: [name price qty region batch_id] rows: 1, cols: 5
func NewFromFile ¶
func NewFromFile(protoFilePath, messageName string, importPaths []string, mem memory.Allocator, opts ...Option) (*Transcoder, error)
NewFromFile returns a new Transcoder by compiling a .proto file at runtime. protoFilePath is the path to the .proto file, messageName is the top-level message to use, and importPaths are the directories to search for imports. Options include WithDenormalizerPlan, WithCustomMessage, and WithCustomMessageFile.
Example ¶
ExampleNewFromFile demonstrates creating a Transcoder directly from a .proto file on disk, without pre-compiling the descriptor yourself.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
)
// writeProtoFile writes proto content to a temp file and returns its path and
// parent directory for use as an import path.
func writeProtoFile(name, content string) (filePath, dir string) {
dir, err := os.MkdirTemp("", "bufarrow-example")
if err != nil {
log.Fatal(err)
}
filePath = filepath.Join(dir, name)
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
log.Fatal(err)
}
return filePath, dir
}
const exampleProto = `syntax = "proto3";
message Item {
string name = 1;
double price = 2;
}
`
func main() {
path, dir := writeProtoFile("item.proto", exampleProto)
defer os.RemoveAll(dir)
tc, err := bufarrowlib.NewFromFile(filepath.Base(path), "Item", []string{dir}, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Populate a message using the schema's descriptor.
md := tc.Schema().Field(0) // just verify schema was built
fmt.Printf("field 0: %s\n", md.Name)
fmt.Printf("fields: %v\n", tc.FieldNames())
}
Output: field 0: name fields: [name price]
func NewTranscoderFromConfig ¶ added in v0.3.0
func NewTranscoderFromConfig(cfg *DenormConfig, mem memory.Allocator) (*Transcoder, error)
NewTranscoderFromConfig builds a Transcoder from a parsed DenormConfig. mem is the Arrow memory allocator to use; pass nil for the default allocator.
Example ¶
ExampleNewTranscoderFromConfig demonstrates building a bufarrowlib.Transcoder from a bufarrowlib.DenormConfig and inspecting the resulting denormalizer schema. It uses the same Order/Item schema as in [ExampleTranscoder_AppendDenorm].
// writeProtoFile (defined in example_transcoder_test.go) creates a
// temporary .proto file and returns its path and the temp directory.
protoFile, dir := writeProtoFile("example.proto", `
syntax = "proto3";
package example;
message Item {
string id = 1;
double price = 2;
}
message Order {
string name = 1;
repeated Item items = 2;
repeated string tags = 3;
int64 seq = 4;
}
`)
defer os.RemoveAll(dir)
src := `
proto:
file: ` + protoFile + `
message: Order
denormalizer:
columns:
- name: order_name
path: name
- name: item_id
path: items[*].id
- name: item_price
path: items[*].price
- name: has_tags
expr:
func: has
args:
- path: tags[*]
`
cfg, err := bufarrowlib.ParseDenormConfig(strings.NewReader(src))
if err != nil {
log.Fatal(err)
}
tc, err := bufarrowlib.NewTranscoderFromConfig(cfg, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Inspect the compiled denormalizer schema.
schema := tc.DenormalizerSchema()
fmt.Printf("columns: %d\n", schema.NumFields())
for i := 0; i < schema.NumFields(); i++ {
f := schema.Field(i)
fmt.Printf(" %-15s %s\n", f.Name, f.Type)
}
Output: columns: 4 order_name utf8 item_id utf8 item_price float64 has_tags bool
func NewTranscoderFromConfigFile ¶ added in v0.3.0
func NewTranscoderFromConfigFile(configPath string, mem memory.Allocator) (*Transcoder, error)
NewTranscoderFromConfigFile reads a YAML file and calls NewTranscoderFromConfig.
func (*Transcoder) Append ¶
func (s *Transcoder) Append(value proto.Message)
Append appends a protobuf message to the transcoder's Arrow record builder. This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_Append shows the basic append-and-flush cycle.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Alpha", 1.0, 10))
tc.Append(newProduct(md, "Bravo", 2.0, 20))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
}
Output: rows: 2
func (*Transcoder) AppendDenorm ¶
func (s *Transcoder) AppendDenorm(msg proto.Message) error
AppendDenorm evaluates the denormalizer plan against msg and appends the resulting denormalized rows to the denormalizer's Arrow record builder.
Fan-out groups are cross-joined: each group contributes max(len(branches), 1) rows, and the total row count is the product of all group counts. Empty fan-out groups (no branches) contribute 1 null row (left-join semantics).
This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendDenorm demonstrates basic denormalization: projecting scalar and fan-out fields from a protobuf message into a flat Arrow record.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.50}, {"B", 2.75}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
fmt.Printf(" %s | %s | %.2f\n", name, id, price)
}
}
Output: rows: 2, cols: 3 order-1 | A | 1.50 order-1 | B | 2.75
Example (Coalesce) ¶
ExampleTranscoder_AppendDenorm_coalesce demonstrates FuncCoalesce in a denormalization plan. Coalesce returns the first non-zero value from multiple paths — useful when a field can come from different sources.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
// Coalesce: use customer name, fall back to email if customer is empty.
pbpath.PlanPath("buyer",
pbpath.WithExpr(pbpath.FuncCoalesce(
pbpath.PathRef("customer"),
pbpath.PathRef("email"),
)),
pbpath.Alias("buyer"),
),
pbpath.PlanPath("region", pbpath.Alias("region")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Message 1: customer is set → "Alice" is used.
msg1 := newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "US", nil, 0)
tc.AppendDenorm(msg1)
// Message 2: customer is empty → email is used as fallback.
msg2 := newSale(saleMD, lineItemMD, "", "bob@example.com", "EU", nil, 0)
tc.AppendDenorm(msg2)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
for i := 0; i < int(rec.NumRows()); i++ {
buyer := rec.Column(0).(*array.String).Value(i)
region := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" buyer=%-18s region=%s\n", buyer, region)
}
}
Output: buyer=Alice region=US buyer=bob@example.com region=EU
Example (ComposedExpr) ¶
ExampleTranscoder_AppendDenorm_composedExpr demonstrates composing multiple Expr functions together. This example creates a "display label" column that combines customer name and uppercase region, with a fallback for missing customer names.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
// Composed expression:
// Concat(": ", Coalesce(customer, email), Upper(region))
// This reads as: join the first non-empty identifier with the
// uppercased region, separated by ": ".
pbpath.PlanPath("display",
pbpath.WithExpr(pbpath.FuncConcat(": ",
pbpath.FuncCoalesce(
pbpath.PathRef("customer"),
pbpath.PathRef("email"),
),
pbpath.FuncUpper(
pbpath.PathRef("region"),
),
)),
pbpath.Alias("display"),
),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@ex.com", "us", nil, 0))
tc.AppendDenorm(newSale(saleMD, lineItemMD, "", "bob@ex.com", "eu", nil, 0))
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Println(rec.Column(0).(*array.String).Value(i))
}
}
Output: Alice: US bob@ex.com: EU
Example (Concat) ¶
ExampleTranscoder_AppendDenorm_concat demonstrates FuncConcat, which joins the string representations of multiple path values with a separator.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
// Concat: join customer and region with " / ".
pbpath.PlanPath("label",
pbpath.WithExpr(pbpath.FuncConcat(" / ",
pbpath.PathRef("customer"),
pbpath.PathRef("region"),
)),
pbpath.Alias("label"),
),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "EU", nil, 0))
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Println(rec.Column(0).(*array.String).Value(i))
}
}
Output: Alice / US Bob / EU
Example (CrossJoin) ¶
ExampleTranscoder_AppendDenorm_crossJoin demonstrates cross-join behaviour when two independent repeated fields are denormalized together.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.0}, {"B", 2.0}},
[]string{"x", "y", "z"}, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d (2 items × 3 tags)\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
id := rec.Column(0).(*array.String).Value(i)
tag := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" %s | %s\n", id, tag)
}
}
Output: rows: 6 (2 items × 3 tags) A | x A | y A | z B | x B | y B | z
Example (Default) ¶
ExampleTranscoder_AppendDenorm_default demonstrates FuncDefault, which provides a literal fallback value when a field is zero/empty.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("customer", pbpath.Alias("customer")),
// Default: if region is empty, use "UNKNOWN".
pbpath.PlanPath("region",
pbpath.WithExpr(pbpath.FuncDefault(
pbpath.PathRef("region"),
pbpath.ScalarString("UNKNOWN"),
)),
pbpath.Alias("region"),
),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "US", nil, 0))
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no region
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
for i := 0; i < int(rec.NumRows()); i++ {
customer := rec.Column(0).(*array.String).Value(i)
region := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" %s: %s\n", customer, region)
}
}
Output: Alice: US Bob: UNKNOWN
Example (Has) ¶
ExampleTranscoder_AppendDenorm_has demonstrates FuncHas, which returns a boolean indicating whether a field has a non-zero value. This is useful for creating presence flag columns.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("customer", pbpath.Alias("customer")),
// Has: does the email field have a non-empty value?
pbpath.PlanPath("has_email",
pbpath.WithExpr(pbpath.FuncHas(
pbpath.PathRef("email"),
)),
pbpath.Alias("has_email"),
),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "alice@example.com", "", nil, 0))
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Bob", "", "", nil, 0)) // no email
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
for i := 0; i < int(rec.NumRows()); i++ {
customer := rec.Column(0).(*array.String).Value(i)
hasEmail := rec.Column(1).(*array.Boolean).Value(i)
fmt.Printf(" %s: has_email=%v\n", customer, hasEmail)
}
}
Output: Alice: has_email=true Bob: has_email=false
Example (LeftJoin) ¶
ExampleTranscoder_AppendDenorm_leftJoin demonstrates left-join semantics when a repeated field is empty: a single null row is produced for that group while the other group fans out normally.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func newOrder(orderMD, itemMD protoreflect.MessageDescriptor, name string, items []struct {
id string
price float64
}, tags []string, seq int64) proto.Message {
msg := dynamicpb.NewMessage(orderMD)
msg.Set(orderMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(orderMD.Fields().ByName("seq"), protoreflect.ValueOfInt64(seq))
list := msg.Mutable(orderMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(itemMD)
item.Set(itemMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(itemMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
tagList := msg.Mutable(orderMD.Fields().ByName("tags")).List()
for _, tg := range tags {
tagList.Append(protoreflect.ValueOfString(tg))
}
return msg
}
func main() {
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("tags[*]", pbpath.Alias("tag")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Zero items, 2 tags → items group produces 1 null row → 1 × 2 = 2 rows
msg := newOrder(orderMD, itemMD, "order-1", nil, []string{"x", "y"}, 1)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
idNull := rec.Column(0).IsNull(i)
tag := rec.Column(1).(*array.String).Value(i)
fmt.Printf(" item_id=null:%v | tag=%s\n", idNull, tag)
}
}
Output: rows: 2 item_id=null:true | tag=x item_id=null:true | tag=y
Example (Upper) ¶
ExampleTranscoder_AppendDenorm_upper demonstrates FuncUpper, which converts a string field to upper case — useful for normalizing data during ingestion.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("region_upper",
pbpath.WithExpr(pbpath.FuncUpper(
pbpath.PathRef("region"),
)),
pbpath.Alias("region_upper"),
),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.AppendDenorm(newSale(saleMD, lineItemMD, "Alice", "", "us-east", nil, 0))
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Println(rec.Column(0).(*array.String).Value(0))
}
Output: US-EAST
Example (WithExprAndFanout) ¶
ExampleTranscoder_AppendDenorm_withExprAndFanout demonstrates combining Expr-based computed columns with fan-out (wildcard) paths. The expression columns broadcast as scalars while the repeated field fans out.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildExprExampleDescriptors constructs a proto schema for demonstrating
// Expr-based denormalization:
//
// message LineItem { string sku = 1; double unit_price = 2; int64 quantity = 3; }
// message Sale {
// string customer = 1;
// string email = 2;
// string region = 3;
// repeated LineItem items = 4;
// int64 timestamp = 5;
// }
func buildExprExampleDescriptors() (saleMD, lineItemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("expr_example.proto"),
Package: proto.String("exprexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("LineItem"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("sku"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("unit_price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("quantity"), Number: proto.Int32(3), Type: int64Type, Label: labelOpt},
},
},
{
Name: proto.String("Sale"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("customer"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("email"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("region"), Number: proto.Int32(3), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(4), Type: messageType, TypeName: proto.String(".exprexample.LineItem"), Label: labelRep},
{Name: proto.String("timestamp"), Number: proto.Int32(5), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Sale"), fd.Messages().ByName("LineItem")
}
func newSale(saleMD, lineItemMD protoreflect.MessageDescriptor, customer, email, region string, items []struct {
sku string
price float64
qty int64
}, ts int64) proto.Message {
msg := dynamicpb.NewMessage(saleMD)
if customer != "" {
msg.Set(saleMD.Fields().ByName("customer"), protoreflect.ValueOfString(customer))
}
if email != "" {
msg.Set(saleMD.Fields().ByName("email"), protoreflect.ValueOfString(email))
}
if region != "" {
msg.Set(saleMD.Fields().ByName("region"), protoreflect.ValueOfString(region))
}
msg.Set(saleMD.Fields().ByName("timestamp"), protoreflect.ValueOfInt64(ts))
list := msg.Mutable(saleMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(lineItemMD)
item.Set(lineItemMD.Fields().ByName("sku"), protoreflect.ValueOfString(it.sku))
item.Set(lineItemMD.Fields().ByName("unit_price"), protoreflect.ValueOfFloat64(it.price))
item.Set(lineItemMD.Fields().ByName("quantity"), protoreflect.ValueOfInt64(it.qty))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
saleMD, lineItemMD := buildExprExampleDescriptors()
tc, err := bufarrowlib.New(saleMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
// Computed column: coalesce customer/email (scalar, broadcasts)
pbpath.PlanPath("buyer",
pbpath.WithExpr(pbpath.FuncCoalesce(
pbpath.PathRef("customer"),
pbpath.PathRef("email"),
)),
pbpath.Alias("buyer"),
),
// Fan-out: one row per line item
pbpath.PlanPath("items[*].sku", pbpath.Alias("sku")),
pbpath.PlanPath("items[*].unit_price", pbpath.Alias("price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newSale(saleMD, lineItemMD, "Alice", "", "US", []struct {
sku string
price float64
qty int64
}{
{"WIDGET-001", 9.99, 2},
{"GADGET-002", 24.50, 1},
}, 0)
tc.AppendDenorm(msg)
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
buyer := rec.Column(0).(*array.String).Value(i)
sku := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
fmt.Printf(" %s | %-12s | %.2f\n", buyer, sku, price)
}
}
Output: rows: 2 Alice | WIDGET-001 | 9.99 Alice | GADGET-002 | 24.50
func (*Transcoder) AppendDenormRaw ¶ added in v0.2.0
func (s *Transcoder) AppendDenormRaw(data []byte) error
AppendDenormRaw unmarshals raw protobuf bytes and appends the denormalized result to the transcoder's denormalizer Arrow record builder.
When a HyperType is configured (via WithHyperType), it uses the compiled parser with hyperpb.Shared for memory reuse and optional PGO profiling. Otherwise, it falls back to proto.Unmarshal with a dynamicpb stencil.
A denormalizer plan (via WithDenormalizerPlan) must be configured.
This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendDenormRaw demonstrates high-performance raw-bytes denormalization. This combines hyperpb decoding with the Plan-based denormalizer to go directly from raw protobuf bytes to a flat Arrow record.
This is the fastest path for streaming protobuf data into analytics-ready flat tables.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
// 1. Create a shared HyperType.
ht := bufarrowlib.NewHyperType(outerMD)
// 2. Create a Transcoder with HyperType + denormalization plan.
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("product")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// 3. Marshal messages to raw bytes (simulating Kafka/gRPC input).
msg := newOuterMessage(outerMD, innerMD, "Gadgets", "", []struct {
id string
price float64
}{{"X", 1.50}, {"Y", 2.75}}, 10)
raw, err := proto.Marshal(msg)
if err != nil {
log.Fatal(err)
}
// 4. Feed raw bytes directly into the denormalizer.
if err := tc.AppendDenormRaw(raw); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
name := rec.Column(0).(*array.String).Value(i)
id := rec.Column(1).(*array.String).Value(i)
price := rec.Column(2).(*array.Float64).Value(i)
fmt.Printf(" %s | %s | %.2f\n", name, id, price)
}
}
Output: rows: 2 Gadgets | X | 1.50 Gadgets | Y | 2.75
Example (Batch) ¶
ExampleTranscoder_AppendDenormRaw_batch demonstrates a typical batch processing pattern: feed many raw messages, then flush to a single Arrow record batch.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
ht := bufarrowlib.NewHyperType(outerMD)
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("product")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Simulate a batch of raw messages from a Kafka consumer.
messages := []struct {
name string
items []struct {
id string
price float64
}
}{
{"Order-1", []struct {
id string
price float64
}{{"A", 1.0}, {"B", 2.0}}},
{"Order-2", []struct {
id string
price float64
}{{"C", 3.0}}},
{"Order-3", []struct {
id string
price float64
}{{"D", 4.0}, {"E", 5.0}, {"F", 6.0}}},
}
for _, m := range messages {
msg := newOuterMessage(outerMD, innerMD, m.name, "", m.items, 0)
raw, _ := proto.Marshal(msg)
if err := tc.AppendDenormRaw(raw); err != nil {
log.Fatal(err)
}
}
// Flush all accumulated rows into one Arrow record batch.
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("messages: %d, denorm rows: %d\n", len(messages), rec.NumRows())
}
Output: messages: 3, denorm rows: 6
func (*Transcoder) AppendDenormRawMerged ¶ added in v0.3.0
func (s *Transcoder) AppendDenormRawMerged(baseBytes, customBytes []byte) error
AppendDenormRawMerged concatenates base and custom serialized protobuf byte slices and appends the denormalized result to the transcoder's denormalizer Arrow record builder.
This follows the same byte-concatenation strategy as [AppendRawMerged] but routes the result through the denormalization engine.
Requires both a custom message (via WithCustomMessage or WithCustomMessageFile) and a denormalizer plan (via WithDenormalizerPlan).
This method is not safe for concurrent use.
func (*Transcoder) AppendRaw ¶ added in v0.2.0
func (s *Transcoder) AppendRaw(data []byte) error
AppendRaw unmarshals raw protobuf bytes using the HyperType's compiled parser and appends the result to the transcoder's Arrow record builder.
This method requires a HyperType configured via WithHyperType. It uses hyperpb.Shared for memory reuse and optionally records profiling data for online PGO. When the auto-recompile threshold is reached, the parser is recompiled inline.
This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendRaw demonstrates high-performance raw-bytes ingestion using AppendRaw. This accepts raw protobuf wire bytes (e.g. from Kafka, gRPC, or a file) and decodes them using hyperpb's compiled parser — 2–3× faster than proto.Unmarshal with generated code.
AppendRaw populates the full Arrow record (like Append), while AppendDenormRaw populates the denormalized flat record (like AppendDenorm).
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
// 1. Create a shared HyperType (compile the parser once).
ht := bufarrowlib.NewHyperType(outerMD)
// 2. Create a Transcoder with HyperType.
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// 3. Marshal a message to raw bytes (simulating receiving from Kafka).
msg := newOuterMessage(outerMD, innerMD, "Widget", "", []struct {
id string
price float64
}{{"A", 9.99}}, 5)
raw, err := proto.Marshal(msg)
if err != nil {
log.Fatal(err)
}
// 4. Feed raw bytes — no proto.Unmarshal needed.
if err := tc.AppendRaw(raw); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: rows: 1, cols: 4
func (*Transcoder) AppendRawMerged ¶ added in v0.3.0
func (s *Transcoder) AppendRawMerged(baseBytes, customBytes []byte) error
AppendRawMerged concatenates base and custom serialized protobuf byte slices and appends the merged result to the transcoder's Arrow record builder.
This works because MergeMessageDescriptors renumbers all custom fields above the base message's maximum field number, so the two wire-format byte slices have strictly disjoint field tags. Protobuf wire-format concatenation (base || custom) produces a valid merged message.
When a HyperType is configured (fast path), the concatenated bytes are parsed via hyperpb.Unmarshal. Otherwise (fallback), proto.Unmarshal into a clone of [Transcoder.stencilCustom] is used.
Returns an error if no custom message was configured via WithCustomMessage or WithCustomMessageFile.
This method is not safe for concurrent use.
func (*Transcoder) AppendWithCustom ¶
AppendWithCustom appends a protobuf value merged with custom field values to the transcoder's builder. The custom proto.Message must conform to the message descriptor provided via WithCustomMessage or WithCustomMessageFile. Both messages are marshalled to bytes and unmarshalled into the merged stencil for appending. This method is not safe for concurrent use.
Example ¶
ExampleTranscoder_AppendWithCustom shows appending a message that includes both base and custom fields.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
// buildCustomDescriptor constructs an inline custom-fields message:
//
// message CustomFields {
// string region = 1;
// int64 batch_id = 2;
// }
func buildCustomDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_custom.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("CustomFields"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("region"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("batch_id"), Number: proto.Int32(2), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("CustomFields")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
baseMD := buildSimpleDescriptor()
customMD := buildCustomDescriptor()
tc, err := bufarrowlib.New(baseMD, memory.DefaultAllocator,
bufarrowlib.WithCustomMessage(customMD),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
base := newProduct(baseMD, "Widget", 9.99, 5)
custom := dynamicpb.NewMessage(customMD)
custom.Set(customMD.Fields().ByName("region"), protoreflect.ValueOfString("EU"))
custom.Set(customMD.Fields().ByName("batch_id"), protoreflect.ValueOfInt64(7))
if err := tc.AppendWithCustom(base, custom); err != nil {
log.Fatal(err)
}
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: rows: 1, cols: 5
func (*Transcoder) Clone ¶
func (s *Transcoder) Clone(mem memory.Allocator) (tc *Transcoder, err error)
Clone returns an identical Transcoder. Use in concurrency scenarios as Transcoder methods are not concurrency safe.
The compiled denormalizer [Plan] is shared (it is immutable), but the Arrow builders, scratch buffers, and leaf scratch are freshly allocated so each clone can operate independently.
Example ¶
ExampleTranscoder_Clone demonstrates cloning a Transcoder for use in a separate goroutine. The clone has independent builders but shares the same schema configuration.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
clone, err := tc.Clone(memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer clone.Release()
clone.Append(newProduct(md, "Gizmo", 5.00, 10))
rec := clone.NewRecordBatch()
defer rec.Release()
fmt.Printf("clone rows: %d\n", rec.NumRows())
origRec := tc.NewRecordBatch()
defer origRec.Release()
fmt.Printf("original rows: %d\n", origRec.NumRows())
}
Output: clone rows: 1 original rows: 0
Example (WithHyperType) ¶
ExampleTranscoder_Clone_withHyperType demonstrates cloning a Transcoder that uses HyperType. The clone shares the same HyperType (so profiling data is aggregated) but has independent Arrow builders. This is the recommended pattern for multi-goroutine pipelines.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildHyperExampleDescriptors constructs a proto schema with nested and
// repeated fields for demonstrating AppendRaw, AppendDenormRaw, and Expr:
//
// message Inner { string id = 1; double price = 2; }
// message Outer {
// string name = 1;
// string alt_name = 2;
// repeated Inner items = 3;
// int64 qty = 4;
// }
func buildHyperExampleDescriptors() (outerMD, innerMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("hyper_example.proto"),
Package: proto.String("hyperexample"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Inner"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Outer"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("alt_name"), Number: proto.Int32(2), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(3), Type: messageType, TypeName: proto.String(".hyperexample.Inner"), Label: labelRep},
{Name: proto.String("qty"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Outer"), fd.Messages().ByName("Inner")
}
func newOuterMessage(outerMD, innerMD protoreflect.MessageDescriptor, name, altName string, items []struct {
id string
price float64
}, qty int64) *dynamicpb.Message {
msg := dynamicpb.NewMessage(outerMD)
msg.Set(outerMD.Fields().ByName("name"), protoreflect.ValueOfString(name))
if altName != "" {
msg.Set(outerMD.Fields().ByName("alt_name"), protoreflect.ValueOfString(altName))
}
msg.Set(outerMD.Fields().ByName("qty"), protoreflect.ValueOfInt64(qty))
list := msg.Mutable(outerMD.Fields().ByName("items")).List()
for _, it := range items {
item := dynamicpb.NewMessage(innerMD)
item.Set(innerMD.Fields().ByName("id"), protoreflect.ValueOfString(it.id))
item.Set(innerMD.Fields().ByName("price"), protoreflect.ValueOfFloat64(it.price))
list.Append(protoreflect.ValueOfMessage(item))
}
return msg
}
func main() {
outerMD, innerMD := buildHyperExampleDescriptors()
// Shared HyperType — both transcoders contribute profiling data.
ht := bufarrowlib.NewHyperType(outerMD)
tc, err := bufarrowlib.New(outerMD, memory.DefaultAllocator,
bufarrowlib.WithHyperType(ht),
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("product")),
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
// Clone for a second goroutine — shares HyperType + immutable Plan,
// fresh Arrow builders and scratch buffers.
clone, err := tc.Clone(memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer clone.Release()
// Feed different data to each transcoder.
msg1 := newOuterMessage(outerMD, innerMD, "Alpha", "", []struct {
id string
price float64
}{{"A1", 1.0}}, 0)
raw1, _ := proto.Marshal(msg1)
tc.AppendDenormRaw(raw1)
msg2 := newOuterMessage(outerMD, innerMD, "Bravo", "", []struct {
id string
price float64
}{{"B1", 2.0}, {"B2", 3.0}}, 0)
raw2, _ := proto.Marshal(msg2)
clone.AppendDenormRaw(raw2)
// Each transcoder flushes independently.
rec1 := tc.NewDenormalizerRecordBatch()
defer rec1.Release()
rec2 := clone.NewDenormalizerRecordBatch()
defer rec2.Release()
fmt.Printf("original: %d rows\n", rec1.NumRows())
fmt.Printf("clone: %d rows\n", rec2.NumRows())
}
Output: original: 1 rows clone: 2 rows
func (*Transcoder) DenormalizerBuilder ¶
func (s *Transcoder) DenormalizerBuilder() *array.RecordBuilder
DenormalizerBuilder returns the denormalizer's Arrow array.RecordBuilder. This is exposed for callers who need to implement custom denormalization logic beyond what Transcoder.AppendDenorm provides. In most cases prefer AppendDenorm for automatic fan-out and cross-join handling. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_DenormalizerBuilder shows accessing the underlying RecordBuilder for custom denormalization logic.
orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
builder := tc.DenormalizerBuilder()
fmt.Printf("builder fields: %d\n", builder.Schema().NumFields())
fmt.Printf("schema: %s\n", builder.Schema().Field(0).Type)
Output: builder fields: 1 schema: utf8
func (*Transcoder) DenormalizerSchema ¶
func (s *Transcoder) DenormalizerSchema() *arrow.Schema
DenormalizerSchema returns the Arrow schema of the denormalized record. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_DenormalizerSchema demonstrates inspecting the Arrow schema of the denormalized record, showing column names, types, and nullability.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"github.com/loicalleyne/bufarrowlib/proto/pbpath"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildExampleDescriptors constructs an inline proto schema for examples:
//
// message Item { string id = 1; double price = 2; }
// message Order {
// string name = 1;
// repeated Item items = 2;
// repeated string tags = 3;
// int64 seq = 4;
// }
func buildExampleDescriptors() (orderMD, itemMD protoreflect.MessageDescriptor) {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum()
messageType := descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
labelRep := descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Item"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("id"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
},
},
{
Name: proto.String("Order"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("items"), Number: proto.Int32(2), Type: messageType, TypeName: proto.String(".example.Item"), Label: labelRep},
{Name: proto.String("tags"), Number: proto.Int32(3), Type: stringType, Label: labelRep},
{Name: proto.String("seq"), Number: proto.Int32(4), Type: int64Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Order"), fd.Messages().ByName("Item")
}
func main() {
orderMD, _ := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("name", pbpath.Alias("order_name")),
pbpath.PlanPath("seq", pbpath.Alias("order_seq")),
pbpath.PlanPath("items[*].price", pbpath.Alias("item_price")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
schema := tc.DenormalizerSchema()
for i, f := range schema.Fields() {
fmt.Printf(" %d: %-12s %-10s nullable=%v\n", i, f.Name, f.Type, f.Nullable)
}
}
Output: 0: order_name utf8 nullable=true 1: order_seq int64 nullable=true 2: item_price float64 nullable=true
func (*Transcoder) FieldNames ¶
func (s *Transcoder) FieldNames() []string
FieldNames returns the top-level Arrow field names of the message schema.
Example ¶
ExampleTranscoder_FieldNames shows retrieving top-level field names.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
fmt.Println(tc.FieldNames())
}
Output: [name price qty]
func (*Transcoder) NewDenormalizerRecordBatch ¶
func (s *Transcoder) NewDenormalizerRecordBatch() arrow.RecordBatch
NewDenormalizerRecordBatch returns the buffered denormalizer builder contents as an arrow.RecordBatch. The builder is reset and can be reused. Returns nil if no denormalizer plan was configured.
Example ¶
ExampleTranscoder_NewDenormalizerRecordBatch shows flushing the denormalizer's builder into a record batch.
orderMD, itemMD := buildExampleDescriptors()
tc, err := bufarrowlib.New(orderMD, memory.DefaultAllocator,
bufarrowlib.WithDenormalizerPlan(
pbpath.PlanPath("items[*].id", pbpath.Alias("item_id")),
),
)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
msg := newOrder(orderMD, itemMD, "order-1",
[]struct {
id string
price float64
}{{"A", 1.0}, {"B", 2.0}},
nil, 1,
)
if err := tc.AppendDenorm(msg); err != nil {
log.Fatal(err)
}
rec := tc.NewDenormalizerRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
fmt.Println(rec.Column(0).(*array.String).Value(i))
}
Output: rows: 2 A B
func (*Transcoder) NewRecordBatch ¶
func (s *Transcoder) NewRecordBatch() arrow.RecordBatch
NewRecordBatch returns the buffered builder contents as an arrow.RecordBatch. The builder is reset and can be reused.
Example ¶
ExampleTranscoder_NewRecordBatch shows building an Arrow record batch from appended messages.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d, cols: %d\n", rec.NumRows(), rec.NumCols())
}
Output: rows: 1, cols: 3
func (*Transcoder) Parquet ¶
func (s *Transcoder) Parquet() *schema.Schema
Parquet returns the Parquet schema.Schema for the message.
Example ¶
ExampleTranscoder_Parquet demonstrates inspecting the Parquet schema.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
pqSchema := tc.Parquet()
fmt.Printf("parquet columns: %d\n", pqSchema.NumColumns())
for i := 0; i < pqSchema.NumColumns(); i++ {
fmt.Printf(" %s\n", pqSchema.Column(i).Name())
}
}
Output: parquet columns: 3 name price qty
func (*Transcoder) Proto ¶
func (s *Transcoder) Proto(r arrow.RecordBatch, rows []int) []proto.Message
Proto decodes selected rows from an Arrow RecordBatch back into protobuf messages. Pass nil for rows to decode all rows.
Example ¶
ExampleTranscoder_Proto demonstrates round-tripping: appending protobuf messages, building an Arrow record, and reconstructing them back as protobuf messages.
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec := tc.NewRecordBatch()
defer rec.Release()
msgs := tc.Proto(rec, nil) // nil = all rows
fmt.Printf("recovered %d messages\n", len(msgs))
for _, m := range msgs {
js, _ := protojson.Marshal(m)
var c bytes.Buffer
json.Compact(&c, js)
fmt.Println(c.String())
}
}
Output: recovered 2 messages {"name":"Widget","price":9.99,"qty":5} {"name":"Gadget","price":24.5,"qty":2}
func (*Transcoder) ReadParquet ¶
func (s *Transcoder) ReadParquet(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.RecordBatch, error)
ReadParquet reads the specified columns from Parquet source r and returns an Arrow RecordBatch. The returned RecordBatch must be released by the caller.
Example ¶
ExampleTranscoder_ReadParquet demonstrates a full Parquet round-trip: write messages to Parquet, then read them back into an Arrow record.
package main
import (
"bytes"
"context"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
reader := bytes.NewReader(buf.Bytes())
rec, err := tc.ReadParquet(context.Background(), reader, nil)
if err != nil {
log.Fatal(err)
}
defer rec.Release()
fmt.Printf("read back: %d rows, %d cols\n", rec.NumRows(), rec.NumCols())
}
Output: read back: 2 rows, 3 cols
func (*Transcoder) Release ¶
func (s *Transcoder) Release()
Release releases the reference on the underlying Arrow record builder.
Example ¶
ExampleTranscoder_Release shows the standard cleanup pattern.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
// Release should always be deferred after construction.
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec := tc.NewRecordBatch()
defer rec.Release()
fmt.Printf("rows: %d\n", rec.NumRows())
}
Output: rows: 1
func (*Transcoder) Schema ¶
func (s *Transcoder) Schema() *arrow.Schema
Schema returns the Arrow arrow.Schema for the message.
Example ¶
ExampleTranscoder_Schema demonstrates inspecting the Arrow schema derived from a protobuf message descriptor.
package main
import (
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
for i, f := range tc.Schema().Fields() {
fmt.Printf(" %d: %-8s %s\n", i, f.Name, f.Type)
}
}
Output: 0: name utf8 1: price float64 2: qty int32
func (*Transcoder) WriteParquet ¶
func (s *Transcoder) WriteParquet(w io.Writer) error
WriteParquet writes the current record to Parquet format on w.
Example ¶
ExampleTranscoder_WriteParquet demonstrates writing appended messages to Parquet and reading them back.
package main
import (
"bytes"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
tc.Append(newProduct(md, "Gadget", 24.50, 2))
var buf bytes.Buffer
if err := tc.WriteParquet(&buf); err != nil {
log.Fatal(err)
}
fmt.Printf("parquet bytes: %d\n", buf.Len())
fmt.Println("wrote parquet successfully")
}
Output: parquet bytes: 714 wrote parquet successfully
func (*Transcoder) WriteParquetRecords ¶
func (s *Transcoder) WriteParquetRecords(w io.Writer, records ...arrow.RecordBatch) error
WriteParquetRecords writes one or many Arrow records to Parquet on w.
Example ¶
ExampleTranscoder_WriteParquetRecords demonstrates writing multiple Arrow record batches to a single Parquet file.
package main
import (
"bytes"
"fmt"
"log"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/loicalleyne/bufarrowlib"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
// buildSimpleDescriptor constructs an inline proto schema for examples:
//
// message Product {
// string name = 1;
// double price = 2;
// int32 qty = 3;
// }
func buildSimpleDescriptor() protoreflect.MessageDescriptor {
stringType := descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum()
doubleType := descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum()
int32Type := descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum()
labelOpt := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()
fdp := &descriptorpb.FileDescriptorProto{
Name: proto.String("example_transcoder.proto"),
Package: proto.String("example"),
Syntax: proto.String("proto3"),
MessageType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("Product"),
Field: []*descriptorpb.FieldDescriptorProto{
{Name: proto.String("name"), Number: proto.Int32(1), Type: stringType, Label: labelOpt},
{Name: proto.String("price"), Number: proto.Int32(2), Type: doubleType, Label: labelOpt},
{Name: proto.String("qty"), Number: proto.Int32(3), Type: int32Type, Label: labelOpt},
},
},
},
}
fd, err := protodesc.NewFile(fdp, nil)
if err != nil {
log.Fatalf("protodesc.NewFile: %v", err)
}
return fd.Messages().ByName("Product")
}
func newProduct(md protoreflect.MessageDescriptor, name string, price float64, qty int32) *dynamicpb.Message {
msg := dynamicpb.NewMessage(md)
msg.Set(md.Fields().ByName("name"), protoreflect.ValueOfString(name))
msg.Set(md.Fields().ByName("price"), protoreflect.ValueOfFloat64(price))
msg.Set(md.Fields().ByName("qty"), protoreflect.ValueOfInt32(qty))
return msg
}
func main() {
md := buildSimpleDescriptor()
tc, err := bufarrowlib.New(md, memory.DefaultAllocator)
if err != nil {
log.Fatal(err)
}
defer tc.Release()
tc.Append(newProduct(md, "Widget", 9.99, 5))
rec1 := tc.NewRecordBatch()
defer rec1.Release()
tc.Append(newProduct(md, "Gadget", 24.50, 2))
rec2 := tc.NewRecordBatch()
defer rec2.Release()
var buf bytes.Buffer
if err := tc.WriteParquetRecords(&buf, rec1, rec2); err != nil {
log.Fatal(err)
}
fmt.Println("wrote 2 record batches to parquet")
}
Output: wrote 2 record batches to parquet
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package main provides C shared-library exports for bufarrowlib.
|
Package main provides C shared-library exports for bufarrowlib. |
|
cmd
|
|
|
pbpath-playground
command
pbpq is a local web playground for testing pbpath Pipeline queries against protobuf messages.
|
pbpq is a local web playground for testing pbpath Pipeline queries against protobuf messages. |
|
gen
|
|
|
proto
|
|
|
pbpath
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values.
|
Package pbpath provides functionality for representing a sequence of protobuf reflection operations on a message, including parsing human-readable path strings and traversing messages along a path to collect values. |