cdc

package
v0.0.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultParquetCompression      = "zstd"
	DefaultParquetCompressionLevel = 3
	DefaultMinRecords              = 20000
	DefaultMaxAgeMs                = 3600000 // 1 hour
	DefaultEstimatedRowBytes       = 1024
	DefaultMaxBatchBytes           = int64(50 * 1024 * 1024)
	DefaultBatchSize               = 10000
	DefaultTargetBaseSizeMB        = 256
	DefaultTargetFileSizeMB        = 256
	DefaultMaxBatchSize            = 10000000 // 10M rows max
	DefaultMaxDeltaSizeMB          = 50
	DefaultDirtyRatioPct           = 5
	DefaultMaxRetries              = 5
	DefaultBaseBackoffMs           = 100
	DefaultMaxBackoffMs            = 10000
	DefaultPGSSLMode               = "require"
)

Default constants

Variables

This section is empty.

Functions

func AcquireSchemaLock

func AcquireSchemaLock(ctx context.Context, db *sql.DB, schemaID int16) (bool, error)

AcquireSchemaLock tries to grab an advisory lock for the schema to avoid concurrent flush/compaction on the same schema.

func BuildBasePath

func BuildBasePath(prefix string, schemaID int16, minRowID, maxRowID string) string

BuildBasePath returns the canonical base file path for a schema. Base files use min/max row_id naming to indicate the row range covered.

func BuildBaseTempPath

func BuildBaseTempPath(prefix string, schemaID int16, fileUUID string) string

BuildBaseTempPath returns the temp path for a base file during init.

func BuildDeltaPath

func BuildDeltaPath(prefix string, schemaID int16, fileUUID string) string

BuildDeltaPath returns the canonical delta file path for a schema.

func BuildTempPath

func BuildTempPath(prefix string, schemaID int16, fileUUID string) string

BuildTempPath returns the temp path for a delta file.

func CopyTmpToFinal

func CopyTmpToFinal(ctx context.Context, client S3ObjectClient, bucket, tmpKey, finalKey string, logger *zap.Logger) error

CopyTmpToFinal copies a parquet file from tmp key to final key and deletes tmp.

func GetChangeLogStats

func GetChangeLogStats(ctx context.Context, db *sql.DB, table string, schemaID int16) (int64, int64, error)

GetChangeLogStats returns count and oldest changed_at for unflushed rows.

func MarkFlushed

func MarkFlushed(ctx context.Context, db *sql.DB, table string, schemaID int16, snapshot int64, flushedAt int64) (int64, error)

MarkFlushed updates flushed_at for rows up to the snapshot.

func MarkFlushedIDsAtSnapshot

func MarkFlushedIDsAtSnapshot(ctx context.Context, db *sql.DB, table string, schemaID int16, rowIDs []uuid.UUID, snapshot int64, flushedAt int64) ([]uuid.UUID, error)

MarkFlushedIDsAtSnapshot updates flushed_at for specific row_ids only if their changed_at is at or before the snapshot. It returns the row_ids actually marked flushed.

func ReleaseSchemaLock

func ReleaseSchemaLock(ctx context.Context, db *sql.DB, schemaID int16) error

ReleaseSchemaLock releases the advisory lock for the schema.

func RunOnce

func RunOnce(ctx context.Context, cfg CDCConfig, s3Client S3ObjectClient, dryRun bool, logger *zap.Logger, schemaRegistry forma.SchemaRegistry) error

RunOnce performs one full pass over schemas and attempts flush where needed. Caller may provide an S3ObjectClient; when nil, AWS config will be loaded from environment (still respecting cfg.S3Region). Optional schemaRegistry enables schema-aware projections in DuckDB export.

func SelectBatchRowIDs

func SelectBatchRowIDs(ctx context.Context, db *sql.DB, table string, schemaID int16, batchSize int) ([]uuid.UUID, int64, error)

SelectBatchRowIDs picks up to batchSize row_ids for flushing and returns a snapshot cutoff (ms) used for exporting.

Types

type CDCConfig

type CDCConfig struct {
	// Table names
	ChangeLogTable  string
	EntityMainTable string
	EAVDataTable    string

	// Thresholds
	MinRecords int   // flush when unflushed rows >= MinRecords
	MaxAgeMs   int64 // flush when oldest unflushed row age >= MaxAgeMs
	BatchSize  int   // maximum rows per snapshot

	// Postgres connection
	PGHost     string
	PGPort     int
	PGUser     string
	PGPassword string
	PGDB       string
	PGUseIAM   bool
	PGSSLMode  string

	// DuckDB export options
	DuckDBPath              string        // optional on-disk duckdb; empty for :memory:
	DuckThreads             int           // PRAGMA threads
	DuckMemLimit            string        // e.g. "4GB"
	QueryTimeout            time.Duration // timeout for duckdb export
	ParquetCompression      string        // e.g. "zstd"
	ParquetCompressionLevel int           // codec level if supported
	EstimatedRowBytes       int           // rough row size estimate for batch sizing
	MaxBatchBytes           int64         // optional cap to limit batch size by bytes

	// Init-specific options (cdc-init base file export)
	TargetFileSizeMB int // target parquet file size in MB (0 = use BatchSize)
	MaxBatchSize     int // maximum rows per batch to cap memory usage

	// S3
	S3Bucket   string
	S3Prefix   string // prefix inside bucket for delta files
	S3Endpoint string
	S3Region   string
	S3UseSSL   bool
	S3UsePath  bool // path style addressing

	// Manifest (optional - when set, flush updates manifest after export)
	ManifestPrefix   string // root prefix for manifests in S3
	ManifestTemplate string // path template, e.g. "manifest/{{.SchemaID}}.json"
}

CDCConfig controls change_log flushing and export behavior. S3 client is injected separately via RunOnce parameter to allow callers to provide either AWS or MinIO implementations.

func (CDCConfig) WithDefaults

func (c CDCConfig) WithDefaults() CDCConfig

WithDefaults returns a copy of CDCConfig with missing fields set to defaults.

type CompactionConfig

type CompactionConfig struct {
	SchemaID         int16         // optional filter; 0 means all
	ManifestPath     string        // s3 path or fs path to manifest JSON for the schema
	TargetBaseSizeMB int           // default 256
	MaxDeltaSizeMB   int           // default 50
	DirtyRatioPct    int           // default 5 (rewrite when updated rows/base rows > 5%)
	RunInterval      time.Duration // scheduler interval when used in a loop
	TempPrefix       string        // temp path prefix for rewrites
	MaxParallelFiles int           // optional parallelism for rewrites

	// Backoff parameters for S3/manifest operations
	MaxRetries  int           // default 5
	BaseBackoff time.Duration // default 100ms
	MaxBackoff  time.Duration // default 10s
}

CompactionConfig controls Base/Delta maintenance.

func (CompactionConfig) WithDefaults

func (c CompactionConfig) WithDefaults() CompactionConfig

WithDefaults returns a copy of CompactionConfig with missing fields set to defaults.

type DuckExporter

type DuckExporter struct {
	DB     *sql.DB
	Logger *zap.Logger
	Config CDCConfig // keep config for compression settings
}

DuckExporter handles DuckDB interactions for exporting snapshots to S3 temp path.

func NewDuckExporter

func NewDuckExporter(ctx context.Context, cfg CDCConfig, s3AccessKey, s3Secret string, logger *zap.Logger) (*DuckExporter, error)

NewDuckExporter opens a DuckDB connection and configures pragmas and extensions.

func (*DuckExporter) ExportBaseFileToTmp

func (e *DuckExporter) ExportBaseFileToTmp(ctx context.Context, pgConnStr string, s3TmpPath string, schemaID int16, rowIDs []uuid.UUID, attrCache forma.SchemaAttributeCache) error

ExportBaseFileToTmp exports existing entity_main + eav_data rows directly to a base parquet file. Unlike delta export, this does NOT use change_log - it reads directly from entity_main. s3TmpPath is the destination like 's3://bucket/base/<schema_id>/_tmp/<tmp_uuid>.parquet'

func (*DuckExporter) ExportSnapshotToTmp

func (e *DuckExporter) ExportSnapshotToTmp(ctx context.Context, pgConnStr string, s3TmpPath string, schemaID int16, snapshotTS int64, rowIDs []uuid.UUID, attrCache forma.SchemaAttributeCache) error

ExportSnapshotToTmp builds an export SQL and runs COPY to the provided s3TmpPath. s3TmpPath is the destination like 's3://bucket/prefix/<schema_id>/_tmp/<tmp_uuid>.parquet'

type Manifest

type Manifest struct {
	SchemaID int16           `json:"schema_id"`
	Base     []ManifestEntry `json:"base,omitempty"`
	Delta    []ManifestEntry `json:"delta,omitempty"`
}

Manifest captures the files for a schema and their stats.

type ManifestConfig

type ManifestConfig struct {
	Bucket       string
	Prefix       string // root prefix for manifests (e.g., manifest/<project>/)
	PathTemplate string // optional template per schema, e.g., "manifest/{{.SchemaID}}.json"
}

ManifestConfig describes where manifests are stored and how to resolve parquet paths.

type ManifestEntry

type ManifestEntry struct {
	Path        string `json:"path"`
	MinRowID    string `json:"min_row_id,omitempty"`
	MaxRowID    string `json:"max_row_id,omitempty"`
	MinTimeSlot int64  `json:"min_time_slot,omitempty"`
	MaxTimeSlot int64  `json:"max_time_slot,omitempty"`
	SizeBytes   int64  `json:"size_bytes,omitempty"`
	Level       string `json:"level,omitempty"` // "base" or "delta"
}

ManifestEntry describes a parquet file tracked for a schema. It can represent either a base or delta file.

type S3FullClient

type S3FullClient interface {
	S3ObjectClient
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

S3FullClient extends S3ObjectClient with GetObject and PutObject for manifest operations.

type S3ObjectClient

type S3ObjectClient interface {
	CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error)
	DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}

S3ObjectClient is a minimal interface for copy + delete used by the CDC flusher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL