files

package
v0.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CSVFormatOption

type CSVFormatOption func(*csvFormat)

CSVFormatOption configures the CSV format behavior.

func Gzip

func Gzip(level int) CSVFormatOption

Gzip enables gzip compression with the provided level.

func WithCompression

func WithCompression(opt CSVFormatOption) CSVFormatOption

WithCompression is a pass-through to keep configuration options grouped.

type Format

type Format interface {
	Extension() string
	Write(w io.Writer, schema *arrow.Schema, rows []map[string]any) error
}

Format defines how data is serialized to files.

func NewCSVFormat

func NewCSVFormat(opts ...CSVFormatOption) Format

NewCSVFormat creates a new CSV format implementation.

type SpoolDriver

type SpoolDriver struct {
	// contains filtered or unexported fields
}

SpoolDriver is a warehouse.Driver that writes analytics data directly to disk files and periodically uploads them to object storage.

Disk is the source of truth for persisted data. An in-memory streams map maintains per-stream state (createdAt, activeSizeBytes) used to evaluate sealing triggers.

func NewSpoolDriver

func NewSpoolDriver(
	ctx context.Context,
	uploader Uploader,
	format Format,
	spoolDir string,
	opts ...SpoolOption,
) *SpoolDriver

NewSpoolDriver creates a new spool driver that writes rows directly to disk and periodically uploads files.

func (*SpoolDriver) AddColumn

func (sd *SpoolDriver) AddColumn(table string, field *arrow.Field) error

AddColumn is a no-op for spool drivers.

func (*SpoolDriver) Close

func (sd *SpoolDriver) Close() error

Close gracefully shuts down the driver.

func (*SpoolDriver) CreateTable

func (sd *SpoolDriver) CreateTable(table string, schema *arrow.Schema) error

CreateTable is a no-op for spool drivers.

func (*SpoolDriver) MissingColumns

func (sd *SpoolDriver) MissingColumns(table string, schema *arrow.Schema) ([]*arrow.Field, error)

MissingColumns always returns an empty slice for spool drivers.

func (*SpoolDriver) Write

func (sd *SpoolDriver) Write(ctx context.Context, table string, schema *arrow.Schema, rows []map[string]any) error

Write appends rows to disk file immediately. All file operations are mutex-protected to prevent concurrent writes.

type SpoolOption

type SpoolOption func(*SpoolDriver)

SpoolOption is a functional option for configuring SpoolDriver.

func WithFlushOnClose added in v0.39.1

func WithFlushOnClose(v bool) SpoolOption

WithFlushOnClose configures whether Close seals and uploads all active segments. Enable this for ephemeral storage where data would be lost on shutdown. With persistent storage, active segments are recovered on next startup.

func WithMaxSegmentAge

func WithMaxSegmentAge(d time.Duration) SpoolOption

WithMaxSegmentAge sets the maximum segment age before sealing.

func WithMaxSegmentSize

func WithMaxSegmentSize(n int64) SpoolOption

WithMaxSegmentSize sets the maximum segment size in bytes before sealing.

func WithSealCheckInterval

func WithSealCheckInterval(d time.Duration) SpoolOption

WithSealCheckInterval sets how often to evaluate sealing triggers.

type Uploader

type Uploader interface {
	Upload(ctx context.Context, localPath, remoteKey string) error
}

Uploader handles uploading files to a destination.

func NewBlobUploader

func NewBlobUploader(bucket *blob.Bucket) Uploader

NewBlobUploader creates a new Uploader that uploads files to a blob bucket.

func NewFilesystemUploader

func NewFilesystemUploader(destDir string) (Uploader, error)

NewFilesystemUploader creates a new Uploader that moves files to a destination directory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL