Documentation
¶
Index ¶
- Constants
- func AcquireSchemaLock(ctx context.Context, db *sql.DB, schemaID int16) (bool, error)
- func BuildBasePath(prefix string, schemaID int16, minRowID, maxRowID string) string
- func BuildBaseTempPath(prefix string, schemaID int16, fileUUID string) string
- func BuildDeltaPath(prefix string, schemaID int16, fileUUID string) string
- func BuildTempPath(prefix string, schemaID int16, fileUUID string) string
- func CopyTmpToFinal(ctx context.Context, client S3ObjectClient, bucket, tmpKey, finalKey string, ...) error
- func GetChangeLogStats(ctx context.Context, db *sql.DB, table string, schemaID int16) (int64, int64, error)
- func MarkFlushed(ctx context.Context, db *sql.DB, table string, schemaID int16, snapshot int64, ...) (int64, error)
- func MarkFlushedIDsAtSnapshot(ctx context.Context, db *sql.DB, table string, schemaID int16, ...) ([]uuid.UUID, error)
- func ReleaseSchemaLock(ctx context.Context, db *sql.DB, schemaID int16) error
- func RunOnce(ctx context.Context, cfg CDCConfig, s3Client S3ObjectClient, dryRun bool, ...) error
- func SelectBatchRowIDs(ctx context.Context, db *sql.DB, table string, schemaID int16, batchSize int) ([]uuid.UUID, int64, error)
- type CDCConfig
- type CompactionConfig
- type DuckExporter
- type Manifest
- type ManifestConfig
- type ManifestEntry
- type S3FullClient
- type S3ObjectClient
Constants ¶
const ( DefaultParquetCompression = "zstd" DefaultParquetCompressionLevel = 3 DefaultMinRecords = 20000 DefaultMaxAgeMs = 3600000 // 1 hour DefaultEstimatedRowBytes = 1024 DefaultMaxBatchBytes = int64(50 * 1024 * 1024) DefaultBatchSize = 10000 DefaultTargetBaseSizeMB = 256 DefaultTargetFileSizeMB = 256 DefaultMaxBatchSize = 10000000 // 10M rows max DefaultMaxDeltaSizeMB = 50 DefaultDirtyRatioPct = 5 DefaultMaxRetries = 5 DefaultBaseBackoffMs = 100 DefaultMaxBackoffMs = 10000 DefaultPGSSLMode = "require" )
Default constants
Variables ¶
This section is empty.
Functions ¶
func AcquireSchemaLock ¶
AcquireSchemaLock tries to grab an advisory lock for the schema to avoid concurrent flush/compaction on the same schema.
func BuildBasePath ¶
BuildBasePath returns the canonical base file path for a schema. Base files use min/max row_id naming to indicate the row range covered.
func BuildBaseTempPath ¶
BuildBaseTempPath returns the temp path for a base file during init.
func BuildDeltaPath ¶
BuildDeltaPath returns the canonical delta file path for a schema.
func BuildTempPath ¶
BuildTempPath returns the temp path for a delta file.
func CopyTmpToFinal ¶
func CopyTmpToFinal(ctx context.Context, client S3ObjectClient, bucket, tmpKey, finalKey string, logger *zap.Logger) error
CopyTmpToFinal copies a parquet file from tmp key to final key and deletes tmp.
func GetChangeLogStats ¶
func GetChangeLogStats(ctx context.Context, db *sql.DB, table string, schemaID int16) (int64, int64, error)
GetChangeLogStats returns count and oldest changed_at for unflushed rows.
func MarkFlushed ¶
func MarkFlushed(ctx context.Context, db *sql.DB, table string, schemaID int16, snapshot int64, flushedAt int64) (int64, error)
MarkFlushed updates flushed_at for rows up to the snapshot.
func MarkFlushedIDsAtSnapshot ¶
func MarkFlushedIDsAtSnapshot(ctx context.Context, db *sql.DB, table string, schemaID int16, rowIDs []uuid.UUID, snapshot int64, flushedAt int64) ([]uuid.UUID, error)
MarkFlushedIDsAtSnapshot updates flushed_at for specific row_ids only if their changed_at is at or before the snapshot. It returns the row_ids actually marked flushed.
func ReleaseSchemaLock ¶
ReleaseSchemaLock releases the advisory lock for the schema.
func RunOnce ¶
func RunOnce(ctx context.Context, cfg CDCConfig, s3Client S3ObjectClient, dryRun bool, logger *zap.Logger, schemaRegistry forma.SchemaRegistry) error
RunOnce performs one full pass over schemas and attempts flush where needed. Caller may provide an S3ObjectClient; when nil, AWS config will be loaded from environment (still respecting cfg.S3Region). Optional schemaRegistry enables schema-aware projections in DuckDB export.
Types ¶
type CDCConfig ¶
type CDCConfig struct {
// Table names
ChangeLogTable string
EntityMainTable string
EAVDataTable string
// Thresholds
MinRecords int // flush when unflushed rows >= MinRecords
MaxAgeMs int64 // flush when oldest unflushed row age >= MaxAgeMs
BatchSize int // maximum rows per snapshot
// Postgres connection
PGHost string
PGPort int
PGUser string
PGPassword string
PGDB string
PGUseIAM bool
PGSSLMode string
// DuckDB export options
DuckDBPath string // optional on-disk duckdb; empty for :memory:
DuckThreads int // PRAGMA threads
DuckMemLimit string // e.g. "4GB"
QueryTimeout time.Duration // timeout for duckdb export
ParquetCompression string // e.g. "zstd"
ParquetCompressionLevel int // codec level if supported
EstimatedRowBytes int // rough row size estimate for batch sizing
MaxBatchBytes int64 // optional cap to limit batch size by bytes
// Init-specific options (cdc-init base file export)
TargetFileSizeMB int // target parquet file size in MB (0 = use BatchSize)
MaxBatchSize int // maximum rows per batch to cap memory usage
// S3
S3Bucket string
S3Prefix string // prefix inside bucket for delta files
S3Endpoint string
S3Region string
S3UseSSL bool
S3UsePath bool // path style addressing
// Manifest (optional - when set, flush updates manifest after export)
ManifestPrefix string // root prefix for manifests in S3
ManifestTemplate string // path template, e.g. "manifest/{{.SchemaID}}.json"
}
CDCConfig controls change_log flushing and export behavior. S3 client is injected separately via RunOnce parameter to allow callers to provide either AWS or MinIO implementations.
func (CDCConfig) WithDefaults ¶
WithDefaults returns a copy of CDCConfig with missing fields set to defaults.
type CompactionConfig ¶
type CompactionConfig struct {
SchemaID int16 // optional filter; 0 means all
ManifestPath string // s3 path or fs path to manifest JSON for the schema
TargetBaseSizeMB int // default 256
MaxDeltaSizeMB int // default 50
DirtyRatioPct int // default 5 (rewrite when updated rows/base rows > 5%)
RunInterval time.Duration // scheduler interval when used in a loop
TempPrefix string // temp path prefix for rewrites
MaxParallelFiles int // optional parallelism for rewrites
// Backoff parameters for S3/manifest operations
MaxRetries int // default 5
BaseBackoff time.Duration // default 100ms
MaxBackoff time.Duration // default 10s
}
CompactionConfig controls Base/Delta maintenance.
func (CompactionConfig) WithDefaults ¶
func (c CompactionConfig) WithDefaults() CompactionConfig
WithDefaults returns a copy of CompactionConfig with missing fields set to defaults.
type DuckExporter ¶
type DuckExporter struct {
DB *sql.DB
Logger *zap.Logger
Config CDCConfig // keep config for compression settings
}
DuckExporter handles DuckDB interactions for exporting snapshots to S3 temp path.
func NewDuckExporter ¶
func NewDuckExporter(ctx context.Context, cfg CDCConfig, s3AccessKey, s3Secret string, logger *zap.Logger) (*DuckExporter, error)
NewDuckExporter opens a DuckDB connection and configures pragmas and extensions.
func (*DuckExporter) ExportBaseFileToTmp ¶
func (e *DuckExporter) ExportBaseFileToTmp(ctx context.Context, pgConnStr string, s3TmpPath string, schemaID int16, rowIDs []uuid.UUID, attrCache forma.SchemaAttributeCache) error
ExportBaseFileToTmp exports existing entity_main + eav_data rows directly to a base parquet file. Unlike delta export, this does NOT use change_log - it reads directly from entity_main. s3TmpPath is the destination like 's3://bucket/base/<schema_id>/_tmp/<tmp_uuid>.parquet'
func (*DuckExporter) ExportSnapshotToTmp ¶
func (e *DuckExporter) ExportSnapshotToTmp(ctx context.Context, pgConnStr string, s3TmpPath string, schemaID int16, snapshotTS int64, rowIDs []uuid.UUID, attrCache forma.SchemaAttributeCache) error
ExportSnapshotToTmp builds an export SQL and runs COPY to the provided s3TmpPath. s3TmpPath is the destination like 's3://bucket/prefix/<schema_id>/_tmp/<tmp_uuid>.parquet'
type Manifest ¶
type Manifest struct {
SchemaID int16 `json:"schema_id"`
Base []ManifestEntry `json:"base,omitempty"`
Delta []ManifestEntry `json:"delta,omitempty"`
}
Manifest captures the files for a schema and their stats.
type ManifestConfig ¶
type ManifestConfig struct {
Bucket string
Prefix string // root prefix for manifests (e.g., manifest/<project>/)
PathTemplate string // optional template per schema, e.g., "manifest/{{.SchemaID}}.json"
}
ManifestConfig describes where manifests are stored and how to resolve parquet paths.
type ManifestEntry ¶
type ManifestEntry struct {
Path string `json:"path"`
MinRowID string `json:"min_row_id,omitempty"`
MaxRowID string `json:"max_row_id,omitempty"`
MinTimeSlot int64 `json:"min_time_slot,omitempty"`
MaxTimeSlot int64 `json:"max_time_slot,omitempty"`
SizeBytes int64 `json:"size_bytes,omitempty"`
Level string `json:"level,omitempty"` // "base" or "delta"
}
ManifestEntry describes a parquet file tracked for a schema. It can represent either a base or delta file.
type S3FullClient ¶
type S3FullClient interface {
S3ObjectClient
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}
S3FullClient extends S3ObjectClient with GetObject and PutObject for manifest operations.
type S3ObjectClient ¶
type S3ObjectClient interface {
CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}
S3ObjectClient is a minimal interface for copy + delete used by the CDC flusher.