internal

package
v0.0.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AdvancedQueryTemplateDuckDB = template.Must(template.New("optimizedQueryDuckDB").Funcs(template.FuncMap{
	"add": func(a, b int) int { return a + b },
}).Parse(`
-- PRAGMA & tuning
PRAGMA memory_limit='4GB';
PRAGMA threads=4;

-- Parameters:
-- $SCHEMA_ID       : integer
-- $PG_CONN         : postgres connection string for postgres_scan
-- $PG_WHERE_CLAUSE : pushdown predicate for entity_main (physical columns)
-- $LOGICAL_WHERE_CLAUSE : logical predicate for final filtering (DuckDB / Parquet columns)
-- $S3_PATHS        : comma-separated paths for read_parquet()
-- $PAGE_SIZE, $OFFSET

WITH
-- Dirty set: rows currently present/dirty in PG change_log (flushed_at = 0)
dirty_ids AS (
  SELECT row_id
  FROM postgres_scan($PG_CONN, 'change_log', 'flushed_at = 0')
  WHERE schema_id = $SCHEMA_ID
),

-- S3 source (Cold/Warm). Read Parquet files and apply logical filters + anti-join
s3_source AS (
  SELECT
    row_id,
    ltbase_created_at AS created_at,
    ltbase_updated_at AS ver_ts,
    ltbase_deleted_at AS deleted_ts,

    -- Logical columns (native in Parquet)
    name,
    age,
    tag

  FROM read_parquet($S3_PATHS)
  WHERE
    ($LOGICAL_WHERE_CLAUSE)
    -- Anti-join: exclude rows that are present in the Dirty Set (PG hot buffer)
    AND row_id NOT IN (SELECT row_id FROM dirty_ids)
),

-- PG source (Hot). Use postgres_scan with pushdown for entity_main, pivot EAV attributes.
pg_source AS (
  SELECT
    m.ltbase_row_id AS row_id,
    m.ltbase_created_at AS created_at,
    cl.changed_at AS ver_ts,
    cl.deleted_at AS deleted_ts,

    -- Explicit casts to align PG types with Parquet schema
    CAST(m.text_01 AS VARCHAR) AS name,
    CAST(m.integer_01 AS INTEGER) AS age,

    -- EAV pivot (explicit casts). Replace attr_id constants with dynamic mapping if needed.
    MAX(CASE WHEN e.attr_id = 205 THEN CAST(e.value_text AS VARCHAR) END) AS tag

  FROM postgres_scan($PG_CONN, 'change_log', 'flushed_at = 0') cl

  -- Pushdown: restrict entity_main at the scan-level using $PG_WHERE_CLAUSE
  JOIN postgres_scan($PG_CONN,
    'SELECT * FROM entity_main_dev
     WHERE ltbase_schema_id = ' || $SCHEMA_ID || '
       AND (' || $PG_WHERE_CLAUSE || ')'
  ) m
    ON cl.schema_id = m.ltbase_schema_id
    AND cl.row_id = m.ltbase_row_id

  LEFT JOIN postgres_scan($PG_CONN, 'eav_data_dev') e
    ON cl.schema_id = e.schema_id AND cl.row_id = e.row_id

  WHERE cl.schema_id = $SCHEMA_ID
  GROUP BY m.ltbase_row_id, m.ltbase_created_at, cl.changed_at, cl.deleted_at, m.text_01, m.integer_01
),

-- Union warm/cold S3 data with hot PG data
unified AS (
  SELECT * FROM s3_source
  UNION ALL
  SELECT * FROM pg_source
)

-- Final selection:
-- - Apply final logical filters to ensure EAV & other logical predicates are respected
-- - Remove soft-deleted rows
-- - Deduplicate using Last-Write-Wins (ver_ts ordering)
SELECT
  row_id,
  name,
  age,
  tag,
  created_at
FROM unified
WHERE
  ($LOGICAL_WHERE_CLAUSE)
  AND (deleted_ts IS NULL OR deleted_ts = 0)

-- Deduplicate: keep most recent version per row_id
QUALIFY ROW_NUMBER() OVER (PARTITION BY row_id ORDER BY ver_ts DESC) = 1

ORDER BY created_at DESC
LIMIT $PAGE_SIZE OFFSET $OFFSET;
`))

AdvancedQueryTemplateDuckDB is the DuckDB SQL template used for federated queries. Placeholders expected to be substituted by the renderer:

$SCHEMA_ID, $PG_CONN, $PG_WHERE_CLAUSE, $LOGICAL_WHERE_CLAUSE, $S3_PATHS, $PAGE_SIZE, $OFFSET
View Source
var ErrListInOrderBy = fmt.Errorf("LIST type attributes cannot be used in ORDER BY")

ErrListInOrderBy is returned when a LIST type attribute is used in ORDER BY.

Functions

func AppendDirtyExclusion added in v0.0.23

func AppendDirtyExclusion(baseClause string, dirtyIDs []uuid.UUID) (string, []any)

AppendDirtyExclusion adds a NOT IN clause excluding dirty row ids. dirtyIDs are converted to strings for DuckDB parameterization using ? placeholders.

func BuildDuckDBQuery added in v0.0.23

func BuildDuckDBQuery(tpl *template.Template, params any, q *FederatedAttributeQuery, dirtyIDs []uuid.UUID, dual *DualClauses) (string, []any, error)

BuildDuckDBQuery prepares a DuckDB SQL string and its arguments for a federated query. It accepts optional DualClauses produced by ToDualClauses; when provided it will use the DuckClause and DuckArgs as the base where clause and inject PgMainClause into template params so the template (or tests) can observe the pushdown fragment. Dirty-ID exclusions are appended to the DuckDB clause regardless of source.

func BuildListPredicate added in v0.0.23

func BuildListPredicate(column, operator, value string, elementType forma.ValueType) (string, any, error)

BuildListPredicate generates a DuckDB predicate for LIST column operations. Supported operators:

  • equals: list_contains(col, value) - checks if value is in the list
  • not_equals: NOT list_contains(col, value)
  • contains: list_any_match(col, x -> x LIKE '%value%')
  • starts_with: list_any_match(col, x -> x LIKE 'value%')
  • gt/gte/lt/lte: list_any_match(col, x -> x OP value)

Returns the SQL fragment and the parameter value to bind.

func CastExpression added in v0.0.23

func CastExpression(columnOrExpr string, v forma.ValueType) string

CastExpression returns a DuckDB-safe CAST expression for a column or expression. The caller is responsible for ensuring the identifier/expression is safe (e.g. using ident helper).

func DecodeBase32ToUUID

func DecodeBase32ToUUID(s string) (uuid.UUID, error)

func DecodeFromBase32

func DecodeFromBase32(s string) ([]byte, error)

func EmitLatency added in v0.0.23

func EmitLatency(ctx context.Context, stage string, ms int64)

EmitLatency records a latency measure (milliseconds) for a named stage. name: "fed_query_latency_histogram" with label {"stage": "<translation|execution|streaming>"}

func EmitPushdownEfficiency added in v0.0.23

func EmitPushdownEfficiency(ctx context.Context, schemaID int16, ratio float64)

EmitPushdownEfficiency records pushdown efficiency as a ratio (float64). name: "fed_query_pushdown_efficiency" with label {"schema_id": "<id>"}

func EmitRowCount added in v0.0.23

func EmitRowCount(ctx context.Context, source string, rows int64)

EmitRowCount records row counts per source. name: "fed_query_row_count" with label {"source": "pg"|"s3"|"duckdb"}

func EncodeToBase32

func EncodeToBase32(data []byte) string

func EncodeUUIDToBase32

func EncodeUUIDToBase32(id uuid.UUID) string

func FilterAttributes added in v0.0.18

func FilterAttributes(attributes map[string]any, attrs []string) map[string]any

FilterAttributes filters a map of attributes based on the requested attribute paths. If attrs is nil or empty, returns the original attributes unchanged. Supports nested paths like "contact.name" or "contact.phone".

func FilterDataRecord added in v0.0.18

func FilterDataRecord(record *forma.DataRecord, attrs []string) *forma.DataRecord

FilterDataRecord applies attribute filtering to a forma.DataRecord. Returns a new DataRecord with filtered attributes.

func GenerateDuckDBWhereClause added in v0.0.23

func GenerateDuckDBWhereClause(q *FederatedAttributeQuery) (string, []any, error)

GenerateDuckDBWhereClause produces a minimal DuckDB WHERE clause for a FederatedAttributeQuery. This is an intentionally small helper for the initial integration: it supports CompositeCondition with KvCondition children and translates simple operators. It returns the clause and a list of args suitable for use with database/sql parameter placeholders ($1, $2 style are left for later templating).

NOTE: This is a minimal implementation to allow compilation and unit testing of rendering logic. Full query translation (including EAV-to-column mapping and proper parameter indexing) will be implemented in follow-up tasks.

func GenerateDuckDBWhereClauseWithExclusions added in v0.0.23

func GenerateDuckDBWhereClauseWithExclusions(q *FederatedAttributeQuery, dirtyIDs []uuid.UUID) (string, []any, error)

GenerateDuckDBWhereClauseWithExclusions builds a DuckDB WHERE clause for the query and appends an exclusion for dirty row ids (to be used as an anti-join).

func IsListType added in v0.0.23

func IsListType(v forma.ValueType) bool

IsListType returns true if the ValueType represents a list/array.

func MapKeys

func MapKeys[K comparable, V any](m map[K]V) []K

MapKeys extracts all keys from a map and returns them as a slice. The order of keys is non-deterministic due to map iteration.

func MapValueTypeToDuckDBType added in v0.0.23

func MapValueTypeToDuckDBType(v forma.ValueType) string

MapValueTypeToDuckDBType maps forma.ValueType to a DuckDB SQL type string. For LIST types, returns the element type only (caller must wrap in LIST(...) if needed).

func MapValueTypeToListDuckDBType added in v0.0.23

func MapValueTypeToListDuckDBType(elementType forma.ValueType) string

MapValueTypeToListDuckDBType returns the DuckDB LIST type for an array of the given element type. e.g., ValueTypeText -> "LIST(VARCHAR)", ValueTypeInteger -> "LIST(INTEGER)"

func MapValues

func MapValues[K comparable, V any](m map[K]V) []V

MapValues extracts all values from a map and returns them as a slice. The order of values is non-deterministic due to map iteration.

func MergeTemplateParamsWithDirtyIDs added in v0.0.23

func MergeTemplateParamsWithDirtyIDs(params any, dirtyIDs []uuid.UUID) any

MergeTemplateParamsWithDirtyIDs injects two helper fields into the provided params map (if map[string]any): HasDirtyIDs (bool) and DirtyIDsCSV (string). This is a convenience for callers that render the DuckDB SQL template and want to optionally include a dirty_ids CTE.

func NewEntityManager

func NewEntityManager(
	transformer PersistentRecordTransformer,
	repository PersistentRecordRepository,
	registry forma.SchemaRegistry,
	config *forma.Config,
) forma.EntityManager

NewEntityManager creates a new EntityManager instance

func NewFileSchemaRegistry

func NewFileSchemaRegistry(pool *pgxpool.Pool, schemaTable string, schemaDir string) (forma.SchemaRegistry, error)

NewFileSchemaRegistry creates a new schema registry that reads schema mappings from a PostgreSQL table and loads attribute definitions from JSON files.

Parameters:

  • pool: PostgreSQL connection pool
  • schemaTable: Name of the schema_registry table (e.g., "schema_registry_1234567890")
  • schemaDir: Directory containing the *_attributes.json files

func NewFileSchemaRegistryFromDirectory added in v0.0.15

func NewFileSchemaRegistryFromDirectory(schemaDir string) (forma.SchemaRegistry, error)

NewFileSchemaRegistryFromDirectory creates a schema registry that scans a directory for schema files and auto-assigns IDs (starting from 100). This mode does not require a database connection.

Parameters:

  • schemaDir: Directory containing the schema files (*.json and *_attributes.json)

func PostgresHealthCheck added in v0.0.23

func PostgresHealthCheck(ctx context.Context, dsn string, timeout time.Duration) error

PostgresHealthCheck attempts to connect and ping a Postgres instance using a DSN. timeout may be 0 to use a sensible default (5s).

func RegisterTelemetryEmitter added in v0.0.23

func RegisterTelemetryEmitter(fn telemetryEmitter)

RegisterTelemetryEmitter registers a custom emitter function. Callers (e.g. service wiring) can provide an OpenTelemetry-backed emitter or a test meter.

func RenderDirtyIDsValuesCSV added in v0.0.23

func RenderDirtyIDsValuesCSV(dirtyIDs []uuid.UUID) string

RenderDirtyIDsValuesCSV builds a VALUES-list fragment suitable for embedding into a SQL template like: VALUES {{.DirtyIDsCSV}} . Example output for two ids: "('id1'),('id2')"

func RenderDuckDBQuery added in v0.0.23

func RenderDuckDBQuery(tpl *template.Template, params any, whereArgs []any) (string, []any, error)

RenderDuckDBQuery renders a DuckDB SQL template (which uses "?" placeholders) and combines the provided whereArgs (typically from GenerateDuckDBWhereClause) with the template-collected args. The order is: whereArgs first, then template args.

func RenderS3ParquetPath added in v0.0.23

func RenderS3ParquetPath(tmpl string, schemaID int16) (string, error)

RenderS3ParquetPath interpolates a simple Go template for parquet path rendering. Example template: "s3://bucket/path/schema_{{.SchemaID}}/data.parquet"

func RenderSQLTemplate added in v0.0.23

func RenderSQLTemplate(tpl *template.Template, data any) (string, []any, error)

Convenience helper: one-shot render

func S3HealthCheck added in v0.0.23

func S3HealthCheck(ctx context.Context, cfg forma.DuckDBConfig, timeout time.Duration) error

S3HealthCheck attempts a best-effort HTTP ping against the configured S3 endpoint. This is intentionally lightweight and non-authoritative: it will only succeed for endpoints that accept anonymous HEAD/GET requests (e.g., some MinIO setups). For AWS S3 this will often return 403 but is still useful to validate DNS/resolution and TLS.

func SanitizeIdentifier added in v0.0.24

func SanitizeIdentifier(name string) string

func SetGlobalDuckDBCircuitBreaker added in v0.0.23

func SetGlobalDuckDBCircuitBreaker(cb *CircuitBreaker)

SetGlobalDuckDBCircuitBreaker registers the global breaker used for DuckDB-related operations.

func ToDuckDBParam added in v0.0.23

func ToDuckDBParam(value any, v forma.ValueType) (any, error)

ToDuckDBParam converts a Go value to the form expected by DuckDB drivers for the given value type. Examples:

  • uuid.UUID -> string
  • time.Time -> time.Time (TIMESTAMP)
  • numeric types -> float64

func ToFloat64 added in v0.0.19

func ToFloat64(v any) (float64, bool)

ToFloat64Ok is an exported helper that behaves like the legacy optimizer helper: it returns (float64, bool) where bool indicates success.

func ValidateDuckDBConfig added in v0.0.23

func ValidateDuckDBConfig(cfg forma.DuckDBConfig) error

ValidateDuckDBConfig performs basic sanity checks on user-provided DuckDB configuration.

func ValidateOrderByAttributesForListTypes added in v0.0.23

func ValidateOrderByAttributesForListTypes(orderBy []AttributeOrder) error

ValidateOrderByAttributesForListTypes checks if resolved AttributeOrder entries are LIST types. This variant works with the internal AttributeOrder type used after attribute resolution.

func ValidateOrderByForListTypes added in v0.0.23

func ValidateOrderByForListTypes(orderBy []forma.OrderBy, getValueType func(attrName string) (forma.ValueType, bool)) error

ValidateOrderByForListTypes checks if any ORDER BY attributes are LIST types. Returns an error if a LIST type is found (LIST columns cannot be used in ORDER BY). Uses forma.OrderBy (the DSL type with Attribute string field).

func ValidatePostgresConfig added in v0.0.23

func ValidatePostgresConfig(cfg forma.DatabaseConfig) error

ValidatePostgresConfig performs basic sanity checks on Postgres-related settings.

func ValidateS3Config added in v0.0.23

func ValidateS3Config(cfg forma.DuckDBConfig) error

ValidateS3Config performs basic sanity checks on S3-related DuckDB settings.

Types

type AtomicBatchPersistentRecordRepository added in v0.0.24

type AtomicBatchPersistentRecordRepository interface {
	BatchInsertPersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
	BatchUpdatePersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
	BatchDeletePersistentRecords(ctx context.Context, tables StorageTables, keys []PersistentRecordKey) error
}

type AttributeConverter

type AttributeConverter struct {
	// contains filtered or unexported fields
}

AttributeConverter provides conversion between EntityAttribute and EAVRecord

func NewAttributeConverter

func NewAttributeConverter(registry forma.SchemaRegistry) *AttributeConverter

NewAttributeConverter creates a new AttributeConverter instance

func (*AttributeConverter) FromEAVRecord

func (c *AttributeConverter) FromEAVRecord(record EAVRecord, valueType forma.ValueType) (EntityAttribute, error)

FromEAVRecord converts an EAVRecord to an EntityAttribute

func (*AttributeConverter) FromEAVRecords

func (c *AttributeConverter) FromEAVRecords(records []EAVRecord) ([]EntityAttribute, error)

FromEAVRecords converts a slice of EAVRecords to EntityAttributes

func (*AttributeConverter) ToEAVRecord

func (c *AttributeConverter) ToEAVRecord(attr EntityAttribute, rowID uuid.UUID) (EAVRecord, error)

ToEAVRecord converts an EntityAttribute to an EAVRecord

func (*AttributeConverter) ToEAVRecords

func (c *AttributeConverter) ToEAVRecords(attributes []EntityAttribute, rowID uuid.UUID) ([]EAVRecord, error)

ToEAVRecords converts a slice of EntityAttributes to EAVRecords

type AttributeOrder

type AttributeOrder struct {
	AttrID          int16
	ValueType       forma.ValueType
	SortOrder       forma.SortOrder
	StorageLocation forma.AttributeStorageLocation // main or eav
	ColumnName      string                         // main table column name if StorageLocation == main
}

AttributeOrder specifies how to sort by a particular attribute.

func (*AttributeOrder) AttrIDInt

func (ao *AttributeOrder) AttrIDInt() int

AttrIDInt returns the attribute ID as an int (for template compatibility).

func (*AttributeOrder) Desc

func (ao *AttributeOrder) Desc() bool

Desc returns true if the sort order is descending.

func (*AttributeOrder) IsMainColumn

func (ao *AttributeOrder) IsMainColumn() bool

IsMainColumn returns true if the attribute is stored in the main table.

func (*AttributeOrder) MainColumnName

func (ao *AttributeOrder) MainColumnName() string

MainColumnName returns the column name in the main table.

func (*AttributeOrder) ValueColumn

func (ao *AttributeOrder) ValueColumn() string

ValueColumn returns the EAV table column name for this attribute's value type.

type AttributeQuery

type AttributeQuery struct {
	SchemaID        int16            `json:"schemaId"`
	Condition       forma.Condition  `json:"condition,omitempty"`
	OrderBy         []forma.OrderBy  `json:"orderBy"`
	AttributeOrders []AttributeOrder `json:"attributeOrders"`
	Limit           int              `json:"limit"`
	Offset          int              `json:"offset"`
}

type CircuitBreaker added in v0.0.23

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker is a lightweight in-memory circuit breaker.

func GetDuckDBCircuitBreaker added in v0.0.23

func GetDuckDBCircuitBreaker() *CircuitBreaker

GetDuckDBCircuitBreaker returns the global breaker instance (may be nil).

func NewCircuitBreaker added in v0.0.23

func NewCircuitBreaker(threshold int, window, openDuration time.Duration) *CircuitBreaker

NewCircuitBreaker creates a configured circuit breaker.

func (*CircuitBreaker) IsOpen added in v0.0.23

func (cb *CircuitBreaker) IsOpen() bool

IsOpen returns true if the breaker is currently open.

func (*CircuitBreaker) RecordFailure added in v0.0.23

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failure occurrence and opens the breaker if threshold exceeded.

func (*CircuitBreaker) RecordSuccess added in v0.0.23

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess resets failure history when operations succeed.

type DBPersistentRecordRepository added in v0.0.23

type DBPersistentRecordRepository struct {
	// contains filtered or unexported fields
}

func NewDBPersistentRecordRepository added in v0.0.23

func NewDBPersistentRecordRepository(pool persistentRecordPool, metadataCache *MetadataCache, duckDBClient *DuckDBClient) *DBPersistentRecordRepository

func (*DBPersistentRecordRepository) BatchDeletePersistentRecords added in v0.0.24

func (r *DBPersistentRecordRepository) BatchDeletePersistentRecords(ctx context.Context, tables StorageTables, keys []PersistentRecordKey) error

func (*DBPersistentRecordRepository) BatchInsertPersistentRecords added in v0.0.24

func (r *DBPersistentRecordRepository) BatchInsertPersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error

func (*DBPersistentRecordRepository) BatchUpdatePersistentRecords added in v0.0.24

func (r *DBPersistentRecordRepository) BatchUpdatePersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error

func (*DBPersistentRecordRepository) DeletePersistentRecord added in v0.0.23

func (r *DBPersistentRecordRepository) DeletePersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) error

func (*DBPersistentRecordRepository) ExecuteDuckDBFederatedQuery added in v0.0.23

func (r *DBPersistentRecordRepository) ExecuteDuckDBFederatedQuery(
	ctx context.Context,
	tables StorageTables,
	q *FederatedAttributeQuery,
	limit, offset int,
	attributeOrders []AttributeOrder,
	opts *FederatedQueryOptions,
) ([]*PersistentRecord, int64, error)

ExecuteDuckDBFederatedQuery runs the DuckDB optimized query template using the provided FederatedAttributeQuery. It fetches dirty IDs from the Postgres change_log (if available), injects exclusions into the DuckDB WHERE clause, executes the query against the global DuckDB client, and returns matched PersistentRecords along with the total record count.

Note: This implementation performs a best-effort scan of columns produced by the optimized query template. It mirrors the column ordering used by the Postgres template:

  • main table projection (entity_main columns, order defined by entityMainColumnDescriptors)
  • attributes_json (TEXT)
  • total_records (bigint)
  • total_pages (bigint)
  • current_page (int)

func (*DBPersistentRecordRepository) ExecuteFederatedPaginatedQuery added in v0.0.23

func (r *DBPersistentRecordRepository) ExecuteFederatedPaginatedQuery(
	ctx context.Context,
	tables StorageTables,
	fq *FederatedAttributeQuery,
	limit, offset int,
	attributeOrders []AttributeOrder,
	opts *FederatedQueryOptions,
) ([]*PersistentRecord, int64, error)

ExecuteFederatedPaginatedQuery performs a federated fetch across Postgres (hot) and DuckDB (cold/warm), merges results with last-write-wins semantics, and returns the requested page plus an accurate total deduplicated across sources.

Notes: - This is an MVP coordinator: it caps per-source fetches (opts.MaxRows or default) to avoid OOM. - For very large result sets a keys-only two-phase approach should be implemented later.

func (*DBPersistentRecordRepository) FetchDirtyRowIDs added in v0.0.23

func (r *DBPersistentRecordRepository) FetchDirtyRowIDs(ctx context.Context, changeLogTable string, schemaID int16) ([]uuid.UUID, error)

FetchDirtyRowIDs returns all row_ids present in the change_log with flushed_at = 0 for the given schema. This can be used by federated query coordinator to exclude dirty rows from columnar/duckdb reads (anti-join).

func (*DBPersistentRecordRepository) GetPersistentRecord added in v0.0.23

func (r *DBPersistentRecordRepository) GetPersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) (*PersistentRecord, error)

func (*DBPersistentRecordRepository) InsertPersistentRecord added in v0.0.23

func (r *DBPersistentRecordRepository) InsertPersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error

func (*DBPersistentRecordRepository) QueryPersistentRecords added in v0.0.23

func (*DBPersistentRecordRepository) QueryPersistentRecordsFederated added in v0.0.23

QueryPersistentRecordsFederated performs a federated query across configured data tiers. Backwards compatible: hot-only hints delegate to QueryPersistentRecords.

func (*DBPersistentRecordRepository) StreamDuckDBFederatedQuery added in v0.0.23

func (r *DBPersistentRecordRepository) StreamDuckDBFederatedQuery(
	ctx context.Context,
	tables StorageTables,
	q *FederatedAttributeQuery,
	limit, offset int,
	attributeOrders []AttributeOrder,
	opts *FederatedQueryOptions,
	rowHandler func(context.Context, *PersistentRecord) error,
) (int64, error)

StreamDuckDBFederatedQuery streams DuckDB federated query results using a rowHandler callback. It reuses the same rowHandler semantics as Postgres' StreamOptimizedQuery to avoid loading the entire result set into memory.

func (*DBPersistentRecordRepository) StreamOptimizedQuery added in v0.0.23

func (r *DBPersistentRecordRepository) StreamOptimizedQuery(
	ctx context.Context,
	tables StorageTables,
	schemaID int16,
	clause string,
	args []any,
	limit, offset int,
	attributeOrders []AttributeOrder,
	useMainTableAsAnchor bool,
	rowHandler func(*PersistentRecord) error,
) (int64, error)

func (*DBPersistentRecordRepository) UpdatePersistentRecord added in v0.0.23

func (r *DBPersistentRecordRepository) UpdatePersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error

type DBPool added in v0.0.19

type DBPool interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}

DBPool is a minimal interface for the methods MetadataLoader needs.

type DataSourcePlan added in v0.0.23

type DataSourcePlan struct {
	// Tier indicates the logical data tier (hot/warm/cold).
	Tier DataTier

	// Engine indicates the execution engine, e.g., "postgres" or "duckdb".
	Engine string

	// SQL optionally contains the generated SQL fragment or rendered template used.
	// For privacy/performance reasons this may be truncated by the repository.
	SQL string

	// RowEstimate is the planner's estimated rows to be scanned/returned (if available).
	RowEstimate int64

	// PredicatePushdown indicates whether predicates were pushed to the source.
	PredicatePushdown bool

	// ActualRows contains the actual rows returned from this source (filled post-execution).
	ActualRows int64

	// DurationMs measures execution time for this source in milliseconds.
	DurationMs int64

	// Reason provides human-readable explanation for selection/behavior.
	Reason string
}

DataSourcePlan captures per-source execution details.

type DataTier added in v0.0.23

type DataTier string
const (
	DataTierHot  DataTier = "hot"
	DataTierWarm DataTier = "warm"
	DataTierCold DataTier = "cold"
)

type DualClauses added in v0.0.23

type DualClauses struct {
	PgClause     string // existing EAV-based clause (EXISTS...)
	PgArgs       []any
	PgMainClause string // predicates that can be pushed into entity_main (m.*)
	PgMainArgs   []any

	DuckClause string
	DuckArgs   []any
}

DualClauses contains SQL fragments and argument lists for both Postgres and DuckDB.

func ToDualClauses added in v0.0.23

func ToDualClauses(
	condition forma.Condition,
	eavTable string,
	schemaID int16,
	cache forma.SchemaAttributeCache,
	paramIndex *int,
) (DualClauses, error)

ToDualClauses generates Postgres and DuckDB WHERE fragments for the given condition. - PgClause reuses existing SQLGenerator (EAV-based EXISTS expressions). - PgMainClause contains predicates suitable for entity_main pushdown. - DuckClause maps attributes to column names when available and emits a simple DuckDB-style clause. Note: DuckDB placeholders are "?" and args are returned in order. Postgres uses $n placeholders.

type DuckDBClient added in v0.0.23

type DuckDBClient struct {
	DB *sql.DB
	// contains filtered or unexported fields
}

DuckDBClient wraps a database/sql DB opened with the DuckDB driver.

func NewDuckDBClient added in v0.0.23

func NewDuckDBClient(cfg forma.DuckDBConfig) (*DuckDBClient, error)

NewDuckDBClient creates and configures a DuckDB client according to the provided config. It attempts to load common extensions (httpfs/parquet) and configure S3 access via PRAGMA when requested.

func (*DuckDBClient) Close added in v0.0.23

func (c *DuckDBClient) Close() error

Close closes the underlying DuckDB DB.

func (*DuckDBClient) HealthCheck added in v0.0.23

func (c *DuckDBClient) HealthCheck(ctx context.Context) error

HealthCheck performs a simple query to validate the DuckDB connection and basic runtime pragmas.

type DuckDBRenderHints added in v0.0.23

type DuckDBRenderHints struct {
	// S3ParquetPathTemplate is a template (with placeholders) for locating parquet files in S3.
	// Example: "s3://bucket/path/schema_{{.SchemaID}}/data.parquet"
	S3ParquetPathTemplate string

	// TimeEncodingHint indicates how date/time values should be encoded in DuckDB side.
	// e.g. "unix_ms" or "iso8601"
	TimeEncodingHint string
}

DuckDBRenderHints provides optional parameters that guide DuckDB SQL generation.

type EAVRecord

type EAVRecord struct {
	SchemaID     int16
	RowID        uuid.UUID // UUID v7, identifies data row
	AttrID       int16     // Attribute ID from schema definition
	ArrayIndices string    // Comma-separated array indices (e.g., "0", "1,2", or "" for non-arrays)
	ValueText    *string   // For valueType: "text"
	ValueNumeric *float64  // For valueType: "numeric"
}

ValueType represents the type of value stored in a row of EAV table.

type EntityAttribute

type EntityAttribute struct {
	SchemaID     int16
	RowID        uuid.UUID // UUID v7, identifies data row
	AttrID       int16
	ArrayIndices string
	ValueType    forma.ValueType
	Value        any
}

func (*EntityAttribute) BigInt

func (ea *EntityAttribute) BigInt() (*int64, error)

func (*EntityAttribute) Bool

func (ea *EntityAttribute) Bool() (*bool, error)

func (*EntityAttribute) Date

func (ea *EntityAttribute) Date() (*time.Time, error)

func (*EntityAttribute) DateTime

func (ea *EntityAttribute) DateTime() (*time.Time, error)

func (*EntityAttribute) Integer

func (ea *EntityAttribute) Integer() (*int32, error)

func (*EntityAttribute) Numeric

func (ea *EntityAttribute) Numeric() (*float64, error)

func (*EntityAttribute) SmallInt

func (ea *EntityAttribute) SmallInt() (*int16, error)

func (*EntityAttribute) Text

func (ea *EntityAttribute) Text() (*string, error)

func (*EntityAttribute) UUID

func (ea *EntityAttribute) UUID() (*uuid.UUID, error)

type ExecutionPlan added in v0.0.23

type ExecutionPlan struct {
	// Routing decision snapshot (which tiers were considered/selected)
	Routing RoutingDecision

	// Per-source plans for each data source touched by the federated execution.
	Sources []DataSourcePlan

	// Merge describes the merge-on-read strategy applied to results across tiers.
	Merge MergePlan

	// Timings: coarse-grained durations in milliseconds for major stages.
	// Keys typically: "translate", "postgres_fetch", "duckdb_fetch", "merge", "total"
	Timings map[string]int64

	// Notes and warnings captured during planning/execution.
	Notes []string
}

ExecutionPlan is a diagnostic structure capturing the federated query execution choices and timings. It is intended for debugging and observability only.

type FederatedAttributeQuery added in v0.0.23

type FederatedAttributeQuery struct {
	AttributeQuery

	// PreferredTiers is an ordered list of preferred data tiers to query.
	// Example: []DataTier{DataTierHot, DataTierWarm, DataTierCold}
	PreferredTiers []DataTier

	// PreferHot indicates strong preference for reading from the hot (Postgres) tier
	// when the same data exists in multiple tiers.
	PreferHot bool

	// UseMainAsAnchor controls whether the main table (entity_main) should be used
	// as the anchor for predicate pushdown. This mirrors existing repository logic.
	UseMainAsAnchor bool

	// DuckDBHints carries optional DuckDB-specific rendering hints, e.g. external
	// parquet path templates or casting preferences.
	DuckDBHints *DuckDBRenderHints
}

FederatedAttributeQuery extends AttributeQuery with federated-specific hints. It embeds the existing AttributeQuery so it remains compatible with existing code paths.

type FederatedQueryOptions added in v0.0.23

type FederatedQueryOptions struct {
	// MaxRows limits the number of rows read from remote/columnar sources per shard.
	MaxRows int

	// Parallelism controls how many parallel DuckDB scan workers to use.
	Parallelism int

	// AllowPartialDegradedMode if true will allow executing the query with only a subset
	// of data tiers available (useful for the early MVP).
	AllowPartialDegradedMode bool

	// IncludeExecutionPlan when true instructs the repository to collect an execution plan
	// for debugging/observability. If set, the repository will allocate and populate
	// ExecutionPlan and assign it to ExecutionPlan (below) so callers may inspect it.
	IncludeExecutionPlan bool

	// ExecutionPlan is populated by the repository when IncludeExecutionPlan==true.
	// Callers should pass a non-nil opts pointer and inspect this field after call.
	ExecutionPlan *ExecutionPlan
}

FederatedQueryOptions contains runtime options for federated execution.

type ListOperatorMapping added in v0.0.23

type ListOperatorMapping struct {
	// Operator is the DSL operator name (equals, contains, starts_with, gt, etc.)
	Operator string
	// DuckDBExpr is a template for the DuckDB expression.
	// Placeholders: {{.Column}} for the column name, {{.Value}} for the parameter.
	DuckDBExpr string
	// RequiresLambda indicates if the expression uses a lambda (x -> predicate)
	RequiresLambda bool
}

ListOperatorMapping defines how DSL operators map to DuckDB LIST functions. DSL operators for LIST columns use list_contains or list_any_match patterns.

type MergePlan added in v0.0.23

type MergePlan struct {
	// Strategy name, e.g., "last-write-wins"
	Strategy string

	// PreferHot indicates whether preferHot tiebreaker was used.
	PreferHot bool

	// DedupKeys lists the keys used for deduplication (typically SchemaID:RowID).
	DedupKeys []string

	// DurationMs time spent merging in milliseconds.
	DurationMs int64

	// Notes optional additional details about attribute-level merging.
	Notes []string
}

MergePlan describes merge-on-read semantics used to combine tiered results.

type MetadataCache

type MetadataCache struct {
	// contains filtered or unexported fields
}

MetadataCache holds all metadata mappings for fast lookups

func NewMetadataCache

func NewMetadataCache() *MetadataCache

NewMetadataCache creates a new metadata cache

func (*MetadataCache) GetSchemaCache

func (mc *MetadataCache) GetSchemaCache(schemaName string) (forma.SchemaAttributeCache, bool)

GetSchemaCache retrieves the schema attribute cache for a schema (thread-safe)

func (*MetadataCache) GetSchemaCacheByID added in v0.0.10

func (mc *MetadataCache) GetSchemaCacheByID(schemaID int16) (forma.SchemaAttributeCache, bool)

func (*MetadataCache) GetSchemaID

func (mc *MetadataCache) GetSchemaID(schemaName string) (int16, bool)

GetSchemaID retrieves schema ID by name (thread-safe)

func (*MetadataCache) GetSchemaName

func (mc *MetadataCache) GetSchemaName(schemaID int16) (string, bool)

GetSchemaName retrieves schema name by ID (thread-safe)

func (*MetadataCache) ListSchemas

func (mc *MetadataCache) ListSchemas() []string

ListSchemas returns all schema names (thread-safe)

type MetadataLoader

type MetadataLoader struct {
	// contains filtered or unexported fields
}

MetadataLoader loads schema and attribute metadata from database and JSON files

func NewMetadataLoader

func NewMetadataLoader(pool DBPool, schemaTableName, schemaDirectory string) *MetadataLoader

NewMetadataLoader creates a new metadata loader

func (*MetadataLoader) LoadMetadata

func (ml *MetadataLoader) LoadMetadata(ctx context.Context) (*MetadataCache, error)

LoadMetadata loads all metadata and returns a cache

type PersistentRecord

type PersistentRecord struct {
	SchemaID        int16
	RowID           uuid.UUID
	TextItems       map[string]string // e.g., "text_01" -> "Hello"
	Int16Items      map[string]int16
	Int32Items      map[string]int32
	Int64Items      map[string]int64
	Float64Items    map[string]float64
	UUIDItems       map[string]uuid.UUID
	CreatedAt       int64
	UpdatedAt       int64
	DeletedAt       *int64
	OtherAttributes []EAVRecord // EAV attributes not in hot table
}

* PersistentRecord 是一个用于表示持久化存储记录的结构体,包含了`entity main`和EAV Attributes。

func MergePersistentRecordsByTier added in v0.0.23

func MergePersistentRecordsByTier(inputs map[DataTier][]*PersistentRecord, preferHot bool) ([]*PersistentRecord, error)

MergePersistentRecordsByTier performs a merge-on-read across multiple data tiers. Inputs are provided as a map from DataTier -> slice of *PersistentRecord. Last-write-wins semantics are applied using PersistentRecord.UpdatedAt and ChangeLog flushed state.

Behavior:

  • Records are deduplicated by (SchemaID, RowID).
  • For each key, the record with the highest UpdatedAt is chosen. If equal and preferHot==true, the record coming from the Hot tier is chosen.
  • If a record originates from the ChangeLog buffer (flushed_at == 0) it is considered the authoritative hot source and wins ties regardless of UpdatedAt.
  • The chosen record is returned with OtherAttributes merged across all source tiers for that (SchemaID, RowID) with attribute-level deduplication.
  • Attributes are deduplicated by (AttrID, ArrayIndices).
  • For an attribute present in multiple source records, the attribute from the record with the latest UpdatedAt is chosen. Ties are resolved using preferHot and deterministic tier ordering.
  • Result slice is sorted by SchemaID then RowID for deterministic output.

type PersistentRecordFederatedQuerier added in v0.0.24

type PersistentRecordFederatedQuerier interface {
	// QueryPersistentRecordsFederated performs a federated query across configured data tiers
	// (Postgres hot and DuckDB/S3 warm/cold). Implementations MUST preserve backwards
	// compatibility for OLTP-only callers: if the federated query hints indicate hot-only
	// execution, this SHOULD delegate to QueryPersistentRecords.
	QueryPersistentRecordsFederated(ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, opts *FederatedQueryOptions) (*PersistentRecordPage, error)
}

type PersistentRecordKey added in v0.0.24

type PersistentRecordKey struct {
	SchemaID int16
	RowID    uuid.UUID
}

type PersistentRecordPage

type PersistentRecordPage struct {
	Records      []*PersistentRecord
	TotalRecords int64
	TotalPages   int
	CurrentPage  int
}

type PersistentRecordQuery

type PersistentRecordQuery struct {
	Tables          StorageTables
	SchemaID        int16
	Condition       forma.Condition
	AttributeOrders []AttributeOrder
	Limit           int
	Offset          int
}

type PersistentRecordReader added in v0.0.24

type PersistentRecordReader interface {
	GetPersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) (*PersistentRecord, error)
	QueryPersistentRecords(ctx context.Context, query *PersistentRecordQuery) (*PersistentRecordPage, error)
}

type PersistentRecordTransformer

type PersistentRecordTransformer interface {
	ToPersistentRecord(ctx context.Context, schemaID int16, rowID uuid.UUID, jsonData any) (*PersistentRecord, error)
	FromPersistentRecord(ctx context.Context, record *PersistentRecord) (map[string]any, error)
}

func NewPersistentRecordTransformer

func NewPersistentRecordTransformer(registry forma.SchemaRegistry) PersistentRecordTransformer

NewPersistentRecordTransformer creates a new PersistentRecordTransformer instance

type PersistentRecordWriter added in v0.0.24

type PersistentRecordWriter interface {
	InsertPersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
	UpdatePersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
	DeletePersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) error
}

type RelationDescriptor added in v0.0.18

type RelationDescriptor struct {
	ChildSchema        string
	ChildPath          string
	ParentSchema       string
	ParentPath         string
	ForeignKeyAttr     string
	ParentIDAttr       string
	ForeignKeyRequired bool
}

RelationDescriptor captures how a child schema derives fields from a parent schema.

type RelationIndex added in v0.0.18

type RelationIndex struct {
	// contains filtered or unexported fields
}

RelationIndex stores parent-child relations keyed by child schema name.

func LoadRelationIndex added in v0.0.18

func LoadRelationIndex(schemaDir string) (*RelationIndex, error)

LoadRelationIndex parses JSON schema files in schemaDir and builds a relation index. If the directory is missing or no relations are found, it returns an empty index.

func (*RelationIndex) Relations added in v0.0.18

func (idx *RelationIndex) Relations(schema string) []RelationDescriptor

Relations returns descriptors for a child schema.

func (*RelationIndex) StripComputedFields added in v0.0.18

func (idx *RelationIndex) StripComputedFields(schema string, data map[string]any) map[string]any

StripComputedFields removes relation-backed attributes from the payload before persistence.

type RoutingDecision added in v0.0.23

type RoutingDecision struct {
	Tiers           []DataTier
	UseDuckDB       bool
	Reason          string
	MaxScanRows     int
	QueryTimeout    time.Duration
	AllowS3Fallback bool
}

RoutingDecision indicates which tiers to query and whether to prefer DuckDB.

func EvaluateRoutingPolicy added in v0.0.23

EvaluateRoutingPolicy makes a routing decision based on config, query hints and options.

type SQLGenerator

type SQLGenerator struct{}

SQLGenerator converts parsed conditions into SQL fragments and argument lists.

func NewSQLGenerator

func NewSQLGenerator() *SQLGenerator

NewSQLGenerator constructs a SQLGenerator.

func (*SQLGenerator) ToSQLClauses added in v0.0.24

func (g *SQLGenerator) ToSQLClauses(
	condition forma.Condition,
	eavTable string,
	schemaID int16,
	cache forma.SchemaAttributeCache,
	paramIndex *int,
) (string, []any, error)

ToSQLClauses builds the SQL clause and arguments for a condition tree.

func (*SQLGenerator) ToSqlClauses

func (g *SQLGenerator) ToSqlClauses(
	condition forma.Condition,
	eavTable string,
	schemaID int16,
	cache forma.SchemaAttributeCache,
	paramIndex *int,
) (string, []any, error)

ToSqlClauses is kept for backward compatibility.

type SQLRenderer added in v0.0.23

type SQLRenderer struct {
	// contains filtered or unexported fields
}

SQLRenderer renders text/template SQL templates while collecting parameter values and providing a safe identifier helper to avoid SQL injection.

func NewSQLRenderer added in v0.0.23

func NewSQLRenderer() *SQLRenderer

func (*SQLRenderer) Ident added in v0.0.23

func (r *SQLRenderer) Ident(name string) (string, error)

Ident validates a SQL identifier (table/column) and returns it quoted.

func (*SQLRenderer) Param added in v0.0.23

func (r *SQLRenderer) Param(v any) string

Param appends a value to the renderer's args and returns a "?" placeholder to be inserted into the template.

func (*SQLRenderer) Render added in v0.0.23

func (r *SQLRenderer) Render(tpl *template.Template, data any) (string, []any, error)

Render executes tpl with data while providing the template functions:

  • param: adds a param and returns "?" placeholder
  • ident: validates and returns a quoted identifier

It returns the rendered SQL and the collected args slice.

type SchemaMetadata

type SchemaMetadata struct {
	SchemaName    string                    `json:"schema_name"`
	SchemaID      int16                     `json:"schema_id"`
	SchemaVersion int                       `json:"schema_version"`
	Attributes    []forma.AttributeMetadata `json:"attributes"`
}

SchemaMetadata aggregates attribute mappings for a schema version.

type Set

type Set[T comparable] struct {
	// contains filtered or unexported fields
}

Set is a generic data structure that represents a collection of unique items. It uses a map internally for O(1) operations.

func NewSet

func NewSet[T comparable]() *Set[T]

NewSet creates and returns a new empty Set.

func (*Set[T]) Add

func (s *Set[T]) Add(item T)

Add inserts an item into the set. If the item already exists, it has no effect.

func (*Set[T]) Clear

func (s *Set[T]) Clear()

Clear removes all items from the set.

func (*Set[T]) Contains

func (s *Set[T]) Contains(item T) bool

Contains checks if an item exists in the set.

func (*Set[T]) Remove

func (s *Set[T]) Remove(item T)

Remove deletes an item from the set. If the item doesn't exist, it has no effect.

func (*Set[T]) Size

func (s *Set[T]) Size() int

Size returns the number of items in the set.

func (*Set[T]) ToSlice

func (s *Set[T]) ToSlice() []T

ToSlice converts the set to a slice containing all items. The order of items is non-deterministic due to map iteration.

type StorageTables

type StorageTables struct {
	EntityMain     string
	EAVData        string
	ChangeLog      string
	SchemaRegistry string
}

type Transformer

type Transformer interface {
	// Single object conversion
	ToAttributes(ctx context.Context, schemaID int16, rowID uuid.UUID, jsonData any) ([]EntityAttribute, error)
	FromAttributes(ctx context.Context, attributes []EntityAttribute) (map[string]any, error)

	// Batch operations
	BatchToAttributes(ctx context.Context, schemaID int16, jsonObjects []any) ([]EntityAttribute, error)
	BatchFromAttributes(ctx context.Context, attributes []EntityAttribute) ([]map[string]any, error)

	// Validation
	ValidateAgainstSchema(ctx context.Context, jsonSchema any, jsonData any) error
}

func NewTransformer

func NewTransformer(registry forma.SchemaRegistry) Transformer

NewTransformer creates a new Transformer instance backed by the provided schema registry.

Directories

Path Synopsis
federated
Package federated provides custom assertions for E2E testing.
Package federated provides custom assertions for E2E testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL