Documentation
¶
Index ¶
- type CSVFormatOption
- type Format
- type SpoolDriver
- func (sd *SpoolDriver) AddColumn(table string, field *arrow.Field) error
- func (sd *SpoolDriver) Close() error
- func (sd *SpoolDriver) CreateTable(table string, schema *arrow.Schema) error
- func (sd *SpoolDriver) MissingColumns(table string, schema *arrow.Schema) ([]*arrow.Field, error)
- func (sd *SpoolDriver) Write(ctx context.Context, table string, schema *arrow.Schema, rows []map[string]any) error
- type SpoolOption
- type Uploader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CSVFormatOption ¶
type CSVFormatOption func(*csvFormat)
CSVFormatOption configures the CSV format behavior.
func Gzip ¶
func Gzip(level int) CSVFormatOption
Gzip enables gzip compression with the provided level.
func WithCompression ¶
func WithCompression(opt CSVFormatOption) CSVFormatOption
WithCompression is a pass-through to keep configuration options grouped.
type Format ¶
type Format interface {
Extension() string
Write(w io.Writer, schema *arrow.Schema, rows []map[string]any) error
}
Format defines how data is serialized to files.
func NewCSVFormat ¶
func NewCSVFormat(opts ...CSVFormatOption) Format
NewCSVFormat creates a new CSV format implementation.
type SpoolDriver ¶
type SpoolDriver struct {
// contains filtered or unexported fields
}
SpoolDriver is a warehouse.Driver that writes analytics data directly to disk files and periodically uploads them to object storage.
Disk is the source of truth for persisted data. An in-memory streams map maintains per-stream state (createdAt, activeSizeBytes) used to evaluate sealing triggers.
func NewSpoolDriver ¶
func NewSpoolDriver( ctx context.Context, uploader Uploader, format Format, spoolDir string, opts ...SpoolOption, ) *SpoolDriver
NewSpoolDriver creates a new spool driver that writes rows directly to disk and periodically uploads files.
func (*SpoolDriver) AddColumn ¶
func (sd *SpoolDriver) AddColumn(table string, field *arrow.Field) error
AddColumn is a no-op for spool drivers.
func (*SpoolDriver) Close ¶
func (sd *SpoolDriver) Close() error
Close gracefully shuts down the driver.
func (*SpoolDriver) CreateTable ¶
func (sd *SpoolDriver) CreateTable(table string, schema *arrow.Schema) error
CreateTable is a no-op for spool drivers.
func (*SpoolDriver) MissingColumns ¶
MissingColumns always returns an empty slice for spool drivers.
type SpoolOption ¶
type SpoolOption func(*SpoolDriver)
SpoolOption is a functional option for configuring SpoolDriver.
func WithFlushOnClose ¶ added in v0.39.1
func WithFlushOnClose(v bool) SpoolOption
WithFlushOnClose configures whether Close seals and uploads all active segments. Enable this for ephemeral storage where data would be lost on shutdown. With persistent storage, active segments are recovered on next startup.
func WithMaxSegmentAge ¶
func WithMaxSegmentAge(d time.Duration) SpoolOption
WithMaxSegmentAge sets the maximum segment age before sealing.
func WithMaxSegmentSize ¶
func WithMaxSegmentSize(n int64) SpoolOption
WithMaxSegmentSize sets the maximum segment size in bytes before sealing.
func WithSealCheckInterval ¶
func WithSealCheckInterval(d time.Duration) SpoolOption
WithSealCheckInterval sets how often to evaluate sealing triggers.
type Uploader ¶
Uploader handles uploading files to a destination.
func NewBlobUploader ¶
NewBlobUploader creates a new Uploader that uploads files to a blob bucket.
func NewFilesystemUploader ¶
NewFilesystemUploader creates a new Uploader that moves files to a destination directory.