parquet

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 15 Imported by: 1

README

Parquet Package

The parquet package provides Parquet file writing capabilities for efficient data storage and analytics.

Features

  • Parquet File Writing: Write data to Parquet format
  • Buffered Writing: Batch messages for efficient I/O
  • Schema Validation: Automatic schema-based sanitization
  • Compression Support: GZIP and other compression codecs
  • Storage Integration: Works with local, OSS, SFTP storage
  • UTF-8 Sanitization: Automatic invalid UTF-8 handling

Main Components

ParquetDataService

Main service for writing Parquet files:

  • WriteMessages(msgs): Write messages to Parquet file
  • Start(ctx): Start continuous processing loop
  • Configurable buffer size and duration
  • Automatic file rotation
ParquetSetting

Configuration for Parquet writing:

  • FsKey: Storage filesystem key
  • Folder: Output directory
  • FilenamePattern: File naming pattern
  • BufferSize: Batch size for buffering
  • BufferDur: Time-based flush interval
  • Compress: Compression codec (e.g., GZIP)
  • Ackfile: Generate acknowledgment files
Schema Handling
  • Automatic schema inference from Go types
  • Message sanitization by schema
  • UTF-8 validation and correction

Usage

// Create service with settings
settings := &ParquetSetting{
    Folder: "data",
    BufferSize: 10000,
    BufferDur: 30 * time.Minute,
    Compress: "GZIP",
}

service := parquet.NewParquetDataServiceT[MyType](settings, "chunk_%s.parquet", channel)

// Start processing
go service.Start(ctx)

Dependencies

  • Parquet-go library
  • Storage package for filesystem abstraction
  • Messaging package for error handling
  • Zap for logging

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ParquetDataService

type ParquetDataService struct {
	Setting *ParquetSetting
	Schema  *parquet.Schema
	Raw     chan any
	Filter  func(msg []any) []any
}

func NewParquetDataServiceBySchema

func NewParquetDataServiceBySchema(setting *ParquetSetting, ss *parquet.Schema, c chan any) *ParquetDataService

func NewParquetDataServiceT

func NewParquetDataServiceT[T any](settings *ParquetSetting, filenamePattern string, c chan T) *ParquetDataService

func (*ParquetDataService) Start

func (p *ParquetDataService) Start(ctx context.Context) error

func (*ParquetDataService) WriteMessages

func (p *ParquetDataService) WriteMessages(msgs []any) (string, error)

type ParquetSetting

type ParquetSetting struct {
	FsKey           string // key for load FS settings
	Folder          string
	FilenamePattern string
	BufferSize      int // settings for batch
	BufferDur       time.Duration
	Compress        string // should enabled compress
	Ackfile         bool   // should generate ack file.

}

func DefaultParquetSetting

func DefaultParquetSetting() *ParquetSetting

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL