parquet

package
v0.11.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2025 License: MIT Imports: 16 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Folder4FailedMsgs = "./data/.error"
	AckExt            = ".ack"
)

Functions

func ToOssFile

func ToOssFile(filename string) string

Types

type DefaultPersistEvent

type DefaultPersistEvent struct {
	Folder  string
	Ackfile bool
}

func (*DefaultPersistEvent) OnPersistDone

func (d *DefaultPersistEvent) OnPersistDone(data []any, filename string)

func (*DefaultPersistEvent) OnPersistFailed

func (d *DefaultPersistEvent) OnPersistFailed(data []any, failed error)

type OssEventService

type OssEventService struct {
	DefaultPersistEvent
	Endpoint  string
	ID        string
	Secret    string
	Bucket    string
	Prefix    string // prefix for object key
	Cleanup   bool
	ToOssFile func(filename string) string
	// contains filtered or unexported fields
}

func (*OssEventService) OnPersistDone

func (d *OssEventService) OnPersistDone(data []any, filename string)

type ParquetDataService

type ParquetDataService struct {
	Setting *ParquetSetting
	Schema  *parquet.Schema
	Raw     chan any
	Filter  func(msg []any) []any
	Event   PersistEvent
}

func NewParquetDataService

func NewParquetDataService(setting *ParquetSetting, s *parquet.Schema) (*ParquetDataService, error)

func NewParquetDataServiceBySchema

func NewParquetDataServiceBySchema(setting *ParquetSetting, ss *parquet.Schema, c chan any) *ParquetDataService

func NewParquetDataServiceT

func NewParquetDataServiceT[T any](settings *ParquetSetting, filenamePattern string, c chan T) *ParquetDataService

func (*ParquetDataService) Start

func (p *ParquetDataService) Start(ctx context.Context) error

func (*ParquetDataService) WriteMessages

func (p *ParquetDataService) WriteMessages(msgs []any) (string, error)

type ParquetSetting

type ParquetSetting struct {
	Folder          string
	FilenamePattern string
	BufferSize      int // settings for batch
	BufferDur       time.Duration
	Compress        string // should enabled compress
	Ackfile         bool   // should generate ack file.

}

func DefaultParquetSetting

func DefaultParquetSetting() *ParquetSetting

type PersistEvent

type PersistEvent interface {
	OnPersistFailed(data []any, err error)
	OnPersistDone(data []any, filename string)
}

PersistEvent is the interface that defines the callback functions for parquet persistence.

func NewOssEventService

func NewOssEventService(logger *zap.Logger) (PersistEvent, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL