Documentation
¶
Index ¶
- Variables
- func AppendDirtyExclusion(baseClause string, dirtyIDs []uuid.UUID) (string, []any)
- func BuildDuckDBQuery(tpl *template.Template, params any, q *FederatedAttributeQuery, ...) (string, []any, error)
- func BuildListPredicate(column, operator, value string, elementType forma.ValueType) (string, any, error)
- func CastExpression(columnOrExpr string, v forma.ValueType) string
- func DecodeBase32ToUUID(s string) (uuid.UUID, error)
- func DecodeFromBase32(s string) ([]byte, error)
- func EmitLatency(ctx context.Context, stage string, ms int64)
- func EmitPushdownEfficiency(ctx context.Context, schemaID int16, ratio float64)
- func EmitRowCount(ctx context.Context, source string, rows int64)
- func EncodeToBase32(data []byte) string
- func EncodeUUIDToBase32(id uuid.UUID) string
- func FilterAttributes(attributes map[string]any, attrs []string) map[string]any
- func FilterDataRecord(record *forma.DataRecord, attrs []string) *forma.DataRecord
- func GenerateDuckDBWhereClause(q *FederatedAttributeQuery) (string, []any, error)
- func GenerateDuckDBWhereClauseWithExclusions(q *FederatedAttributeQuery, dirtyIDs []uuid.UUID) (string, []any, error)
- func IsListType(v forma.ValueType) bool
- func MapKeys[K comparable, V any](m map[K]V) []K
- func MapValueTypeToDuckDBType(v forma.ValueType) string
- func MapValueTypeToListDuckDBType(elementType forma.ValueType) string
- func MapValues[K comparable, V any](m map[K]V) []V
- func MergeTemplateParamsWithDirtyIDs(params any, dirtyIDs []uuid.UUID) any
- func NewEntityManager(transformer PersistentRecordTransformer, repository PersistentRecordRepository, ...) forma.EntityManager
- func NewFileSchemaRegistry(pool *pgxpool.Pool, schemaTable string, schemaDir string) (forma.SchemaRegistry, error)
- func NewFileSchemaRegistryFromDirectory(schemaDir string) (forma.SchemaRegistry, error)
- func PostgresHealthCheck(ctx context.Context, dsn string, timeout time.Duration) error
- func RegisterTelemetryEmitter(fn telemetryEmitter)
- func RenderDirtyIDsValuesCSV(dirtyIDs []uuid.UUID) string
- func RenderDuckDBQuery(tpl *template.Template, params any, whereArgs []any) (string, []any, error)
- func RenderS3ParquetPath(tmpl string, schemaID int16) (string, error)
- func RenderSQLTemplate(tpl *template.Template, data any) (string, []any, error)
- func S3HealthCheck(ctx context.Context, cfg forma.DuckDBConfig, timeout time.Duration) error
- func SanitizeIdentifier(name string) string
- func SetGlobalDuckDBCircuitBreaker(cb *CircuitBreaker)
- func ToDuckDBParam(value any, v forma.ValueType) (any, error)
- func ToFloat64(v any) (float64, bool)
- func ValidateDuckDBConfig(cfg forma.DuckDBConfig) error
- func ValidateOrderByAttributesForListTypes(orderBy []AttributeOrder) error
- func ValidateOrderByForListTypes(orderBy []forma.OrderBy, ...) error
- func ValidatePostgresConfig(cfg forma.DatabaseConfig) error
- func ValidateS3Config(cfg forma.DuckDBConfig) error
- type AtomicBatchPersistentRecordRepository
- type AttributeConverter
- func (c *AttributeConverter) FromEAVRecord(record EAVRecord, valueType forma.ValueType) (EntityAttribute, error)
- func (c *AttributeConverter) FromEAVRecords(records []EAVRecord) ([]EntityAttribute, error)
- func (c *AttributeConverter) ToEAVRecord(attr EntityAttribute, rowID uuid.UUID) (EAVRecord, error)
- func (c *AttributeConverter) ToEAVRecords(attributes []EntityAttribute, rowID uuid.UUID) ([]EAVRecord, error)
- type AttributeOrder
- type AttributeQuery
- type CircuitBreaker
- type DBPersistentRecordRepository
- func (r *DBPersistentRecordRepository) BatchDeletePersistentRecords(ctx context.Context, tables StorageTables, keys []PersistentRecordKey) error
- func (r *DBPersistentRecordRepository) BatchInsertPersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
- func (r *DBPersistentRecordRepository) BatchUpdatePersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
- func (r *DBPersistentRecordRepository) DeletePersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) error
- func (r *DBPersistentRecordRepository) ExecuteDuckDBFederatedQuery(ctx context.Context, tables StorageTables, q *FederatedAttributeQuery, ...) ([]*PersistentRecord, int64, error)
- func (r *DBPersistentRecordRepository) ExecuteFederatedPaginatedQuery(ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, ...) ([]*PersistentRecord, int64, error)
- func (r *DBPersistentRecordRepository) FetchDirtyRowIDs(ctx context.Context, changeLogTable string, schemaID int16) ([]uuid.UUID, error)
- func (r *DBPersistentRecordRepository) GetPersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) (*PersistentRecord, error)
- func (r *DBPersistentRecordRepository) InsertPersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
- func (r *DBPersistentRecordRepository) QueryPersistentRecords(ctx context.Context, query *PersistentRecordQuery) (*PersistentRecordPage, error)
- func (r *DBPersistentRecordRepository) QueryPersistentRecordsFederated(ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, ...) (*PersistentRecordPage, error)
- func (r *DBPersistentRecordRepository) StreamDuckDBFederatedQuery(ctx context.Context, tables StorageTables, q *FederatedAttributeQuery, ...) (int64, error)
- func (r *DBPersistentRecordRepository) StreamOptimizedQuery(ctx context.Context, tables StorageTables, schemaID int16, clause string, ...) (int64, error)
- func (r *DBPersistentRecordRepository) UpdatePersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
- type DBPool
- type DataSourcePlan
- type DataTier
- type DualClauses
- type DuckDBClient
- type DuckDBRenderHints
- type EAVRecord
- type EntityAttribute
- func (ea *EntityAttribute) BigInt() (*int64, error)
- func (ea *EntityAttribute) Bool() (*bool, error)
- func (ea *EntityAttribute) Date() (*time.Time, error)
- func (ea *EntityAttribute) DateTime() (*time.Time, error)
- func (ea *EntityAttribute) Integer() (*int32, error)
- func (ea *EntityAttribute) Numeric() (*float64, error)
- func (ea *EntityAttribute) SmallInt() (*int16, error)
- func (ea *EntityAttribute) Text() (*string, error)
- func (ea *EntityAttribute) UUID() (*uuid.UUID, error)
- type ExecutionPlan
- type FederatedAttributeQuery
- type FederatedQueryOptions
- type ListOperatorMapping
- type MergePlan
- type MetadataCache
- func (mc *MetadataCache) GetSchemaCache(schemaName string) (forma.SchemaAttributeCache, bool)
- func (mc *MetadataCache) GetSchemaCacheByID(schemaID int16) (forma.SchemaAttributeCache, bool)
- func (mc *MetadataCache) GetSchemaID(schemaName string) (int16, bool)
- func (mc *MetadataCache) GetSchemaName(schemaID int16) (string, bool)
- func (mc *MetadataCache) ListSchemas() []string
- type MetadataLoader
- type PersistentRecord
- type PersistentRecordFederatedQuerier
- type PersistentRecordKey
- type PersistentRecordPage
- type PersistentRecordQuery
- type PersistentRecordReader
- type PersistentRecordRepository
- type PersistentRecordTransformer
- type PersistentRecordWriter
- type RelationDescriptor
- type RelationIndex
- type RoutingDecision
- type SQLGenerator
- type SQLRenderer
- type SchemaMetadata
- type Set
- type StorageTables
- type Transformer
Constants ¶
This section is empty.
Variables ¶
var AdvancedQueryTemplateDuckDB = template.Must(template.New("optimizedQueryDuckDB").Funcs(template.FuncMap{ "add": func(a, b int) int { return a + b }, }).Parse(` -- PRAGMA & tuning PRAGMA memory_limit='4GB'; PRAGMA threads=4; -- Parameters: -- $SCHEMA_ID : integer -- $PG_CONN : postgres connection string for postgres_scan -- $PG_WHERE_CLAUSE : pushdown predicate for entity_main (physical columns) -- $LOGICAL_WHERE_CLAUSE : logical predicate for final filtering (DuckDB / Parquet columns) -- $S3_PATHS : comma-separated paths for read_parquet() -- $PAGE_SIZE, $OFFSET WITH -- Dirty set: rows currently present/dirty in PG change_log (flushed_at = 0) dirty_ids AS ( SELECT row_id FROM postgres_scan($PG_CONN, 'change_log', 'flushed_at = 0') WHERE schema_id = $SCHEMA_ID ), -- S3 source (Cold/Warm). Read Parquet files and apply logical filters + anti-join s3_source AS ( SELECT row_id, ltbase_created_at AS created_at, ltbase_updated_at AS ver_ts, ltbase_deleted_at AS deleted_ts, -- Logical columns (native in Parquet) name, age, tag FROM read_parquet($S3_PATHS) WHERE ($LOGICAL_WHERE_CLAUSE) -- Anti-join: exclude rows that are present in the Dirty Set (PG hot buffer) AND row_id NOT IN (SELECT row_id FROM dirty_ids) ), -- PG source (Hot). Use postgres_scan with pushdown for entity_main, pivot EAV attributes. pg_source AS ( SELECT m.ltbase_row_id AS row_id, m.ltbase_created_at AS created_at, cl.changed_at AS ver_ts, cl.deleted_at AS deleted_ts, -- Explicit casts to align PG types with Parquet schema CAST(m.text_01 AS VARCHAR) AS name, CAST(m.integer_01 AS INTEGER) AS age, -- EAV pivot (explicit casts). Replace attr_id constants with dynamic mapping if needed. MAX(CASE WHEN e.attr_id = 205 THEN CAST(e.value_text AS VARCHAR) END) AS tag FROM postgres_scan($PG_CONN, 'change_log', 'flushed_at = 0') cl -- Pushdown: restrict entity_main at the scan-level using $PG_WHERE_CLAUSE JOIN postgres_scan($PG_CONN, 'SELECT * FROM entity_main_dev WHERE ltbase_schema_id = ' || $SCHEMA_ID || ' AND (' || $PG_WHERE_CLAUSE || ')' ) m ON cl.schema_id = m.ltbase_schema_id AND cl.row_id = m.ltbase_row_id LEFT JOIN postgres_scan($PG_CONN, 'eav_data_dev') e ON cl.schema_id = e.schema_id AND cl.row_id = e.row_id WHERE cl.schema_id = $SCHEMA_ID GROUP BY m.ltbase_row_id, m.ltbase_created_at, cl.changed_at, cl.deleted_at, m.text_01, m.integer_01 ), -- Union warm/cold S3 data with hot PG data unified AS ( SELECT * FROM s3_source UNION ALL SELECT * FROM pg_source ) -- Final selection: -- - Apply final logical filters to ensure EAV & other logical predicates are respected -- - Remove soft-deleted rows -- - Deduplicate using Last-Write-Wins (ver_ts ordering) SELECT row_id, name, age, tag, created_at FROM unified WHERE ($LOGICAL_WHERE_CLAUSE) AND (deleted_ts IS NULL OR deleted_ts = 0) -- Deduplicate: keep most recent version per row_id QUALIFY ROW_NUMBER() OVER (PARTITION BY row_id ORDER BY ver_ts DESC) = 1 ORDER BY created_at DESC LIMIT $PAGE_SIZE OFFSET $OFFSET; `))
AdvancedQueryTemplateDuckDB is the DuckDB SQL template used for federated queries. Placeholders expected to be substituted by the renderer:
$SCHEMA_ID, $PG_CONN, $PG_WHERE_CLAUSE, $LOGICAL_WHERE_CLAUSE, $S3_PATHS, $PAGE_SIZE, $OFFSET
var ErrListInOrderBy = fmt.Errorf("LIST type attributes cannot be used in ORDER BY")
ErrListInOrderBy is returned when a LIST type attribute is used in ORDER BY.
Functions ¶
func AppendDirtyExclusion ¶ added in v0.0.23
AppendDirtyExclusion adds a NOT IN clause excluding dirty row ids. dirtyIDs are converted to strings for DuckDB parameterization using ? placeholders.
func BuildDuckDBQuery ¶ added in v0.0.23
func BuildDuckDBQuery(tpl *template.Template, params any, q *FederatedAttributeQuery, dirtyIDs []uuid.UUID, dual *DualClauses) (string, []any, error)
BuildDuckDBQuery prepares a DuckDB SQL string and its arguments for a federated query. It accepts optional DualClauses produced by ToDualClauses; when provided it will use the DuckClause and DuckArgs as the base where clause and inject PgMainClause into template params so the template (or tests) can observe the pushdown fragment. Dirty-ID exclusions are appended to the DuckDB clause regardless of source.
func BuildListPredicate ¶ added in v0.0.23
func BuildListPredicate(column, operator, value string, elementType forma.ValueType) (string, any, error)
BuildListPredicate generates a DuckDB predicate for LIST column operations. Supported operators:
- equals: list_contains(col, value) - checks if value is in the list
- not_equals: NOT list_contains(col, value)
- contains: list_any_match(col, x -> x LIKE '%value%')
- starts_with: list_any_match(col, x -> x LIKE 'value%')
- gt/gte/lt/lte: list_any_match(col, x -> x OP value)
Returns the SQL fragment and the parameter value to bind.
func CastExpression ¶ added in v0.0.23
CastExpression returns a DuckDB-safe CAST expression for a column or expression. The caller is responsible for ensuring the identifier/expression is safe (e.g. using ident helper).
func DecodeFromBase32 ¶
func EmitLatency ¶ added in v0.0.23
EmitLatency records a latency measure (milliseconds) for a named stage. name: "fed_query_latency_histogram" with label {"stage": "<translation|execution|streaming>"}
func EmitPushdownEfficiency ¶ added in v0.0.23
EmitPushdownEfficiency records pushdown efficiency as a ratio (float64). name: "fed_query_pushdown_efficiency" with label {"schema_id": "<id>"}
func EmitRowCount ¶ added in v0.0.23
EmitRowCount records row counts per source. name: "fed_query_row_count" with label {"source": "pg"|"s3"|"duckdb"}
func EncodeToBase32 ¶
func EncodeUUIDToBase32 ¶
func FilterAttributes ¶ added in v0.0.18
FilterAttributes filters a map of attributes based on the requested attribute paths. If attrs is nil or empty, returns the original attributes unchanged. Supports nested paths like "contact.name" or "contact.phone".
func FilterDataRecord ¶ added in v0.0.18
func FilterDataRecord(record *forma.DataRecord, attrs []string) *forma.DataRecord
FilterDataRecord applies attribute filtering to a forma.DataRecord. Returns a new DataRecord with filtered attributes.
func GenerateDuckDBWhereClause ¶ added in v0.0.23
func GenerateDuckDBWhereClause(q *FederatedAttributeQuery) (string, []any, error)
GenerateDuckDBWhereClause produces a minimal DuckDB WHERE clause for a FederatedAttributeQuery. This is an intentionally small helper for the initial integration: it supports CompositeCondition with KvCondition children and translates simple operators. It returns the clause and a list of args suitable for use with database/sql parameter placeholders ($1, $2 style are left for later templating).
NOTE: This is a minimal implementation to allow compilation and unit testing of rendering logic. Full query translation (including EAV-to-column mapping and proper parameter indexing) will be implemented in follow-up tasks.
func GenerateDuckDBWhereClauseWithExclusions ¶ added in v0.0.23
func GenerateDuckDBWhereClauseWithExclusions(q *FederatedAttributeQuery, dirtyIDs []uuid.UUID) (string, []any, error)
GenerateDuckDBWhereClauseWithExclusions builds a DuckDB WHERE clause for the query and appends an exclusion for dirty row ids (to be used as an anti-join).
func IsListType ¶ added in v0.0.23
IsListType returns true if the ValueType represents a list/array.
func MapKeys ¶
func MapKeys[K comparable, V any](m map[K]V) []K
MapKeys extracts all keys from a map and returns them as a slice. The order of keys is non-deterministic due to map iteration.
func MapValueTypeToDuckDBType ¶ added in v0.0.23
MapValueTypeToDuckDBType maps forma.ValueType to a DuckDB SQL type string. For LIST types, returns the element type only (caller must wrap in LIST(...) if needed).
func MapValueTypeToListDuckDBType ¶ added in v0.0.23
MapValueTypeToListDuckDBType returns the DuckDB LIST type for an array of the given element type. e.g., ValueTypeText -> "LIST(VARCHAR)", ValueTypeInteger -> "LIST(INTEGER)"
func MapValues ¶
func MapValues[K comparable, V any](m map[K]V) []V
MapValues extracts all values from a map and returns them as a slice. The order of values is non-deterministic due to map iteration.
func MergeTemplateParamsWithDirtyIDs ¶ added in v0.0.23
MergeTemplateParamsWithDirtyIDs injects two helper fields into the provided params map (if map[string]any): HasDirtyIDs (bool) and DirtyIDsCSV (string). This is a convenience for callers that render the DuckDB SQL template and want to optionally include a dirty_ids CTE.
func NewEntityManager ¶
func NewEntityManager( transformer PersistentRecordTransformer, repository PersistentRecordRepository, registry forma.SchemaRegistry, config *forma.Config, ) forma.EntityManager
NewEntityManager creates a new EntityManager instance
func NewFileSchemaRegistry ¶
func NewFileSchemaRegistry(pool *pgxpool.Pool, schemaTable string, schemaDir string) (forma.SchemaRegistry, error)
NewFileSchemaRegistry creates a new schema registry that reads schema mappings from a PostgreSQL table and loads attribute definitions from JSON files.
Parameters:
- pool: PostgreSQL connection pool
- schemaTable: Name of the schema_registry table (e.g., "schema_registry_1234567890")
- schemaDir: Directory containing the *_attributes.json files
func NewFileSchemaRegistryFromDirectory ¶ added in v0.0.15
func NewFileSchemaRegistryFromDirectory(schemaDir string) (forma.SchemaRegistry, error)
NewFileSchemaRegistryFromDirectory creates a schema registry that scans a directory for schema files and auto-assigns IDs (starting from 100). This mode does not require a database connection.
Parameters:
- schemaDir: Directory containing the schema files (*.json and *_attributes.json)
func PostgresHealthCheck ¶ added in v0.0.23
PostgresHealthCheck attempts to connect and ping a Postgres instance using a DSN. timeout may be 0 to use a sensible default (5s).
func RegisterTelemetryEmitter ¶ added in v0.0.23
func RegisterTelemetryEmitter(fn telemetryEmitter)
RegisterTelemetryEmitter registers a custom emitter function. Callers (e.g. service wiring) can provide an OpenTelemetry-backed emitter or a test meter.
func RenderDirtyIDsValuesCSV ¶ added in v0.0.23
RenderDirtyIDsValuesCSV builds a VALUES-list fragment suitable for embedding into a SQL template like: VALUES {{.DirtyIDsCSV}} . Example output for two ids: "('id1'),('id2')"
func RenderDuckDBQuery ¶ added in v0.0.23
RenderDuckDBQuery renders a DuckDB SQL template (which uses "?" placeholders) and combines the provided whereArgs (typically from GenerateDuckDBWhereClause) with the template-collected args. The order is: whereArgs first, then template args.
func RenderS3ParquetPath ¶ added in v0.0.23
RenderS3ParquetPath interpolates a simple Go template for parquet path rendering. Example template: "s3://bucket/path/schema_{{.SchemaID}}/data.parquet"
func RenderSQLTemplate ¶ added in v0.0.23
Convenience helper: one-shot render
func S3HealthCheck ¶ added in v0.0.23
S3HealthCheck attempts a best-effort HTTP ping against the configured S3 endpoint. This is intentionally lightweight and non-authoritative: it will only succeed for endpoints that accept anonymous HEAD/GET requests (e.g., some MinIO setups). For AWS S3 this will often return 403 but is still useful to validate DNS/resolution and TLS.
func SanitizeIdentifier ¶ added in v0.0.24
func SetGlobalDuckDBCircuitBreaker ¶ added in v0.0.23
func SetGlobalDuckDBCircuitBreaker(cb *CircuitBreaker)
SetGlobalDuckDBCircuitBreaker registers the global breaker used for DuckDB-related operations.
func ToDuckDBParam ¶ added in v0.0.23
ToDuckDBParam converts a Go value to the form expected by DuckDB drivers for the given value type. Examples:
- uuid.UUID -> string
- time.Time -> time.Time (TIMESTAMP)
- numeric types -> float64
func ToFloat64 ¶ added in v0.0.19
ToFloat64Ok is an exported helper that behaves like the legacy optimizer helper: it returns (float64, bool) where bool indicates success.
func ValidateDuckDBConfig ¶ added in v0.0.23
func ValidateDuckDBConfig(cfg forma.DuckDBConfig) error
ValidateDuckDBConfig performs basic sanity checks on user-provided DuckDB configuration.
func ValidateOrderByAttributesForListTypes ¶ added in v0.0.23
func ValidateOrderByAttributesForListTypes(orderBy []AttributeOrder) error
ValidateOrderByAttributesForListTypes checks if resolved AttributeOrder entries are LIST types. This variant works with the internal AttributeOrder type used after attribute resolution.
func ValidateOrderByForListTypes ¶ added in v0.0.23
func ValidateOrderByForListTypes(orderBy []forma.OrderBy, getValueType func(attrName string) (forma.ValueType, bool)) error
ValidateOrderByForListTypes checks if any ORDER BY attributes are LIST types. Returns an error if a LIST type is found (LIST columns cannot be used in ORDER BY). Uses forma.OrderBy (the DSL type with Attribute string field).
func ValidatePostgresConfig ¶ added in v0.0.23
func ValidatePostgresConfig(cfg forma.DatabaseConfig) error
ValidatePostgresConfig performs basic sanity checks on Postgres-related settings.
func ValidateS3Config ¶ added in v0.0.23
func ValidateS3Config(cfg forma.DuckDBConfig) error
ValidateS3Config performs basic sanity checks on S3-related DuckDB settings.
Types ¶
type AtomicBatchPersistentRecordRepository ¶ added in v0.0.24
type AtomicBatchPersistentRecordRepository interface {
BatchInsertPersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
BatchUpdatePersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
BatchDeletePersistentRecords(ctx context.Context, tables StorageTables, keys []PersistentRecordKey) error
}
type AttributeConverter ¶
type AttributeConverter struct {
// contains filtered or unexported fields
}
AttributeConverter provides conversion between EntityAttribute and EAVRecord
func NewAttributeConverter ¶
func NewAttributeConverter(registry forma.SchemaRegistry) *AttributeConverter
NewAttributeConverter creates a new AttributeConverter instance
func (*AttributeConverter) FromEAVRecord ¶
func (c *AttributeConverter) FromEAVRecord(record EAVRecord, valueType forma.ValueType) (EntityAttribute, error)
FromEAVRecord converts an EAVRecord to an EntityAttribute
func (*AttributeConverter) FromEAVRecords ¶
func (c *AttributeConverter) FromEAVRecords(records []EAVRecord) ([]EntityAttribute, error)
FromEAVRecords converts a slice of EAVRecords to EntityAttributes
func (*AttributeConverter) ToEAVRecord ¶
func (c *AttributeConverter) ToEAVRecord(attr EntityAttribute, rowID uuid.UUID) (EAVRecord, error)
ToEAVRecord converts an EntityAttribute to an EAVRecord
func (*AttributeConverter) ToEAVRecords ¶
func (c *AttributeConverter) ToEAVRecords(attributes []EntityAttribute, rowID uuid.UUID) ([]EAVRecord, error)
ToEAVRecords converts a slice of EntityAttributes to EAVRecords
type AttributeOrder ¶
type AttributeOrder struct {
AttrID int16
ValueType forma.ValueType
SortOrder forma.SortOrder
StorageLocation forma.AttributeStorageLocation // main or eav
ColumnName string // main table column name if StorageLocation == main
}
AttributeOrder specifies how to sort by a particular attribute.
func (*AttributeOrder) AttrIDInt ¶
func (ao *AttributeOrder) AttrIDInt() int
AttrIDInt returns the attribute ID as an int (for template compatibility).
func (*AttributeOrder) Desc ¶
func (ao *AttributeOrder) Desc() bool
Desc returns true if the sort order is descending.
func (*AttributeOrder) IsMainColumn ¶
func (ao *AttributeOrder) IsMainColumn() bool
IsMainColumn returns true if the attribute is stored in the main table.
func (*AttributeOrder) MainColumnName ¶
func (ao *AttributeOrder) MainColumnName() string
MainColumnName returns the column name in the main table.
func (*AttributeOrder) ValueColumn ¶
func (ao *AttributeOrder) ValueColumn() string
ValueColumn returns the EAV table column name for this attribute's value type.
type AttributeQuery ¶
type CircuitBreaker ¶ added in v0.0.23
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker is a lightweight in-memory circuit breaker.
func GetDuckDBCircuitBreaker ¶ added in v0.0.23
func GetDuckDBCircuitBreaker() *CircuitBreaker
GetDuckDBCircuitBreaker returns the global breaker instance (may be nil).
func NewCircuitBreaker ¶ added in v0.0.23
func NewCircuitBreaker(threshold int, window, openDuration time.Duration) *CircuitBreaker
NewCircuitBreaker creates a configured circuit breaker.
func (*CircuitBreaker) IsOpen ¶ added in v0.0.23
func (cb *CircuitBreaker) IsOpen() bool
IsOpen returns true if the breaker is currently open.
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.23
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failure occurrence and opens the breaker if threshold exceeded.
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.23
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess resets failure history when operations succeed.
type DBPersistentRecordRepository ¶ added in v0.0.23
type DBPersistentRecordRepository struct {
// contains filtered or unexported fields
}
func NewDBPersistentRecordRepository ¶ added in v0.0.23
func NewDBPersistentRecordRepository(pool persistentRecordPool, metadataCache *MetadataCache, duckDBClient *DuckDBClient) *DBPersistentRecordRepository
func (*DBPersistentRecordRepository) BatchDeletePersistentRecords ¶ added in v0.0.24
func (r *DBPersistentRecordRepository) BatchDeletePersistentRecords(ctx context.Context, tables StorageTables, keys []PersistentRecordKey) error
func (*DBPersistentRecordRepository) BatchInsertPersistentRecords ¶ added in v0.0.24
func (r *DBPersistentRecordRepository) BatchInsertPersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
func (*DBPersistentRecordRepository) BatchUpdatePersistentRecords ¶ added in v0.0.24
func (r *DBPersistentRecordRepository) BatchUpdatePersistentRecords(ctx context.Context, tables StorageTables, records []*PersistentRecord) error
func (*DBPersistentRecordRepository) DeletePersistentRecord ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) DeletePersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) error
func (*DBPersistentRecordRepository) ExecuteDuckDBFederatedQuery ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) ExecuteDuckDBFederatedQuery( ctx context.Context, tables StorageTables, q *FederatedAttributeQuery, limit, offset int, attributeOrders []AttributeOrder, opts *FederatedQueryOptions, ) ([]*PersistentRecord, int64, error)
ExecuteDuckDBFederatedQuery runs the DuckDB optimized query template using the provided FederatedAttributeQuery. It fetches dirty IDs from the Postgres change_log (if available), injects exclusions into the DuckDB WHERE clause, executes the query against the global DuckDB client, and returns matched PersistentRecords along with the total record count.
Note: This implementation performs a best-effort scan of columns produced by the optimized query template. It mirrors the column ordering used by the Postgres template:
- main table projection (entity_main columns, order defined by entityMainColumnDescriptors)
- attributes_json (TEXT)
- total_records (bigint)
- total_pages (bigint)
- current_page (int)
func (*DBPersistentRecordRepository) ExecuteFederatedPaginatedQuery ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) ExecuteFederatedPaginatedQuery( ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, limit, offset int, attributeOrders []AttributeOrder, opts *FederatedQueryOptions, ) ([]*PersistentRecord, int64, error)
ExecuteFederatedPaginatedQuery performs a federated fetch across Postgres (hot) and DuckDB (cold/warm), merges results with last-write-wins semantics, and returns the requested page plus an accurate total deduplicated across sources.
Notes: - This is an MVP coordinator: it caps per-source fetches (opts.MaxRows or default) to avoid OOM. - For very large result sets a keys-only two-phase approach should be implemented later.
func (*DBPersistentRecordRepository) FetchDirtyRowIDs ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) FetchDirtyRowIDs(ctx context.Context, changeLogTable string, schemaID int16) ([]uuid.UUID, error)
FetchDirtyRowIDs returns all row_ids present in the change_log with flushed_at = 0 for the given schema. This can be used by federated query coordinator to exclude dirty rows from columnar/duckdb reads (anti-join).
func (*DBPersistentRecordRepository) GetPersistentRecord ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) GetPersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) (*PersistentRecord, error)
func (*DBPersistentRecordRepository) InsertPersistentRecord ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) InsertPersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
func (*DBPersistentRecordRepository) QueryPersistentRecords ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) QueryPersistentRecords(ctx context.Context, query *PersistentRecordQuery) (*PersistentRecordPage, error)
func (*DBPersistentRecordRepository) QueryPersistentRecordsFederated ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) QueryPersistentRecordsFederated(ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, opts *FederatedQueryOptions) (*PersistentRecordPage, error)
QueryPersistentRecordsFederated performs a federated query across configured data tiers. Backwards compatible: hot-only hints delegate to QueryPersistentRecords.
func (*DBPersistentRecordRepository) StreamDuckDBFederatedQuery ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) StreamDuckDBFederatedQuery( ctx context.Context, tables StorageTables, q *FederatedAttributeQuery, limit, offset int, attributeOrders []AttributeOrder, opts *FederatedQueryOptions, rowHandler func(context.Context, *PersistentRecord) error, ) (int64, error)
StreamDuckDBFederatedQuery streams DuckDB federated query results using a rowHandler callback. It reuses the same rowHandler semantics as Postgres' StreamOptimizedQuery to avoid loading the entire result set into memory.
func (*DBPersistentRecordRepository) StreamOptimizedQuery ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) StreamOptimizedQuery( ctx context.Context, tables StorageTables, schemaID int16, clause string, args []any, limit, offset int, attributeOrders []AttributeOrder, useMainTableAsAnchor bool, rowHandler func(*PersistentRecord) error, ) (int64, error)
func (*DBPersistentRecordRepository) UpdatePersistentRecord ¶ added in v0.0.23
func (r *DBPersistentRecordRepository) UpdatePersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
type DataSourcePlan ¶ added in v0.0.23
type DataSourcePlan struct {
// Tier indicates the logical data tier (hot/warm/cold).
Tier DataTier
// Engine indicates the execution engine, e.g., "postgres" or "duckdb".
Engine string
// SQL optionally contains the generated SQL fragment or rendered template used.
// For privacy/performance reasons this may be truncated by the repository.
SQL string
// RowEstimate is the planner's estimated rows to be scanned/returned (if available).
RowEstimate int64
// PredicatePushdown indicates whether predicates were pushed to the source.
PredicatePushdown bool
// ActualRows contains the actual rows returned from this source (filled post-execution).
ActualRows int64
// DurationMs measures execution time for this source in milliseconds.
DurationMs int64
// Reason provides human-readable explanation for selection/behavior.
Reason string
}
DataSourcePlan captures per-source execution details.
type DualClauses ¶ added in v0.0.23
type DualClauses struct {
PgClause string // existing EAV-based clause (EXISTS...)
PgArgs []any
PgMainClause string // predicates that can be pushed into entity_main (m.*)
PgMainArgs []any
DuckClause string
DuckArgs []any
}
DualClauses contains SQL fragments and argument lists for both Postgres and DuckDB.
func ToDualClauses ¶ added in v0.0.23
func ToDualClauses( condition forma.Condition, eavTable string, schemaID int16, cache forma.SchemaAttributeCache, paramIndex *int, ) (DualClauses, error)
ToDualClauses generates Postgres and DuckDB WHERE fragments for the given condition. - PgClause reuses existing SQLGenerator (EAV-based EXISTS expressions). - PgMainClause contains predicates suitable for entity_main pushdown. - DuckClause maps attributes to column names when available and emits a simple DuckDB-style clause. Note: DuckDB placeholders are "?" and args are returned in order. Postgres uses $n placeholders.
type DuckDBClient ¶ added in v0.0.23
DuckDBClient wraps a database/sql DB opened with the DuckDB driver.
func NewDuckDBClient ¶ added in v0.0.23
func NewDuckDBClient(cfg forma.DuckDBConfig) (*DuckDBClient, error)
NewDuckDBClient creates and configures a DuckDB client according to the provided config. It attempts to load common extensions (httpfs/parquet) and configure S3 access via PRAGMA when requested.
func (*DuckDBClient) Close ¶ added in v0.0.23
func (c *DuckDBClient) Close() error
Close closes the underlying DuckDB DB.
func (*DuckDBClient) HealthCheck ¶ added in v0.0.23
func (c *DuckDBClient) HealthCheck(ctx context.Context) error
HealthCheck performs a simple query to validate the DuckDB connection and basic runtime pragmas.
type DuckDBRenderHints ¶ added in v0.0.23
type DuckDBRenderHints struct {
// S3ParquetPathTemplate is a template (with placeholders) for locating parquet files in S3.
// Example: "s3://bucket/path/schema_{{.SchemaID}}/data.parquet"
S3ParquetPathTemplate string
// TimeEncodingHint indicates how date/time values should be encoded in DuckDB side.
// e.g. "unix_ms" or "iso8601"
TimeEncodingHint string
}
DuckDBRenderHints provides optional parameters that guide DuckDB SQL generation.
type EAVRecord ¶
type EAVRecord struct {
SchemaID int16
RowID uuid.UUID // UUID v7, identifies data row
AttrID int16 // Attribute ID from schema definition
ArrayIndices string // Comma-separated array indices (e.g., "0", "1,2", or "" for non-arrays)
ValueText *string // For valueType: "text"
ValueNumeric *float64 // For valueType: "numeric"
}
ValueType represents the type of value stored in a row of EAV table.
type EntityAttribute ¶
type EntityAttribute struct {
SchemaID int16
RowID uuid.UUID // UUID v7, identifies data row
AttrID int16
ArrayIndices string
ValueType forma.ValueType
Value any
}
func (*EntityAttribute) BigInt ¶
func (ea *EntityAttribute) BigInt() (*int64, error)
func (*EntityAttribute) Bool ¶
func (ea *EntityAttribute) Bool() (*bool, error)
func (*EntityAttribute) Integer ¶
func (ea *EntityAttribute) Integer() (*int32, error)
func (*EntityAttribute) Numeric ¶
func (ea *EntityAttribute) Numeric() (*float64, error)
func (*EntityAttribute) SmallInt ¶
func (ea *EntityAttribute) SmallInt() (*int16, error)
func (*EntityAttribute) Text ¶
func (ea *EntityAttribute) Text() (*string, error)
type ExecutionPlan ¶ added in v0.0.23
type ExecutionPlan struct {
// Routing decision snapshot (which tiers were considered/selected)
Routing RoutingDecision
// Per-source plans for each data source touched by the federated execution.
Sources []DataSourcePlan
// Merge describes the merge-on-read strategy applied to results across tiers.
Merge MergePlan
// Timings: coarse-grained durations in milliseconds for major stages.
// Keys typically: "translate", "postgres_fetch", "duckdb_fetch", "merge", "total"
Timings map[string]int64
// Notes and warnings captured during planning/execution.
Notes []string
}
ExecutionPlan is a diagnostic structure capturing the federated query execution choices and timings. It is intended for debugging and observability only.
type FederatedAttributeQuery ¶ added in v0.0.23
type FederatedAttributeQuery struct {
AttributeQuery
// PreferredTiers is an ordered list of preferred data tiers to query.
// Example: []DataTier{DataTierHot, DataTierWarm, DataTierCold}
PreferredTiers []DataTier
// PreferHot indicates strong preference for reading from the hot (Postgres) tier
// when the same data exists in multiple tiers.
PreferHot bool
// UseMainAsAnchor controls whether the main table (entity_main) should be used
// as the anchor for predicate pushdown. This mirrors existing repository logic.
UseMainAsAnchor bool
// DuckDBHints carries optional DuckDB-specific rendering hints, e.g. external
// parquet path templates or casting preferences.
DuckDBHints *DuckDBRenderHints
}
FederatedAttributeQuery extends AttributeQuery with federated-specific hints. It embeds the existing AttributeQuery so it remains compatible with existing code paths.
type FederatedQueryOptions ¶ added in v0.0.23
type FederatedQueryOptions struct {
// MaxRows limits the number of rows read from remote/columnar sources per shard.
MaxRows int
// Parallelism controls how many parallel DuckDB scan workers to use.
Parallelism int
// AllowPartialDegradedMode if true will allow executing the query with only a subset
// of data tiers available (useful for the early MVP).
AllowPartialDegradedMode bool
// IncludeExecutionPlan when true instructs the repository to collect an execution plan
// for debugging/observability. If set, the repository will allocate and populate
// ExecutionPlan and assign it to ExecutionPlan (below) so callers may inspect it.
IncludeExecutionPlan bool
// ExecutionPlan is populated by the repository when IncludeExecutionPlan==true.
// Callers should pass a non-nil opts pointer and inspect this field after call.
ExecutionPlan *ExecutionPlan
}
FederatedQueryOptions contains runtime options for federated execution.
type ListOperatorMapping ¶ added in v0.0.23
type ListOperatorMapping struct {
// Operator is the DSL operator name (equals, contains, starts_with, gt, etc.)
Operator string
// DuckDBExpr is a template for the DuckDB expression.
// Placeholders: {{.Column}} for the column name, {{.Value}} for the parameter.
DuckDBExpr string
// RequiresLambda indicates if the expression uses a lambda (x -> predicate)
RequiresLambda bool
}
ListOperatorMapping defines how DSL operators map to DuckDB LIST functions. DSL operators for LIST columns use list_contains or list_any_match patterns.
type MergePlan ¶ added in v0.0.23
type MergePlan struct {
// Strategy name, e.g., "last-write-wins"
Strategy string
// PreferHot indicates whether preferHot tiebreaker was used.
PreferHot bool
// DedupKeys lists the keys used for deduplication (typically SchemaID:RowID).
DedupKeys []string
// DurationMs time spent merging in milliseconds.
DurationMs int64
// Notes optional additional details about attribute-level merging.
Notes []string
}
MergePlan describes merge-on-read semantics used to combine tiered results.
type MetadataCache ¶
type MetadataCache struct {
// contains filtered or unexported fields
}
MetadataCache holds all metadata mappings for fast lookups
func NewMetadataCache ¶
func NewMetadataCache() *MetadataCache
NewMetadataCache creates a new metadata cache
func (*MetadataCache) GetSchemaCache ¶
func (mc *MetadataCache) GetSchemaCache(schemaName string) (forma.SchemaAttributeCache, bool)
GetSchemaCache retrieves the schema attribute cache for a schema (thread-safe)
func (*MetadataCache) GetSchemaCacheByID ¶ added in v0.0.10
func (mc *MetadataCache) GetSchemaCacheByID(schemaID int16) (forma.SchemaAttributeCache, bool)
func (*MetadataCache) GetSchemaID ¶
func (mc *MetadataCache) GetSchemaID(schemaName string) (int16, bool)
GetSchemaID retrieves schema ID by name (thread-safe)
func (*MetadataCache) GetSchemaName ¶
func (mc *MetadataCache) GetSchemaName(schemaID int16) (string, bool)
GetSchemaName retrieves schema name by ID (thread-safe)
func (*MetadataCache) ListSchemas ¶
func (mc *MetadataCache) ListSchemas() []string
ListSchemas returns all schema names (thread-safe)
type MetadataLoader ¶
type MetadataLoader struct {
// contains filtered or unexported fields
}
MetadataLoader loads schema and attribute metadata from database and JSON files
func NewMetadataLoader ¶
func NewMetadataLoader(pool DBPool, schemaTableName, schemaDirectory string) *MetadataLoader
NewMetadataLoader creates a new metadata loader
func (*MetadataLoader) LoadMetadata ¶
func (ml *MetadataLoader) LoadMetadata(ctx context.Context) (*MetadataCache, error)
LoadMetadata loads all metadata and returns a cache
type PersistentRecord ¶
type PersistentRecord struct {
SchemaID int16
RowID uuid.UUID
TextItems map[string]string // e.g., "text_01" -> "Hello"
Int16Items map[string]int16
Int32Items map[string]int32
Int64Items map[string]int64
Float64Items map[string]float64
UUIDItems map[string]uuid.UUID
CreatedAt int64
UpdatedAt int64
DeletedAt *int64
OtherAttributes []EAVRecord // EAV attributes not in hot table
}
* PersistentRecord 是一个用于表示持久化存储记录的结构体,包含了`entity main`和EAV Attributes。
func MergePersistentRecordsByTier ¶ added in v0.0.23
func MergePersistentRecordsByTier(inputs map[DataTier][]*PersistentRecord, preferHot bool) ([]*PersistentRecord, error)
MergePersistentRecordsByTier performs a merge-on-read across multiple data tiers. Inputs are provided as a map from DataTier -> slice of *PersistentRecord. Last-write-wins semantics are applied using PersistentRecord.UpdatedAt and ChangeLog flushed state.
Behavior:
- Records are deduplicated by (SchemaID, RowID).
- For each key, the record with the highest UpdatedAt is chosen. If equal and preferHot==true, the record coming from the Hot tier is chosen.
- If a record originates from the ChangeLog buffer (flushed_at == 0) it is considered the authoritative hot source and wins ties regardless of UpdatedAt.
- The chosen record is returned with OtherAttributes merged across all source tiers for that (SchemaID, RowID) with attribute-level deduplication.
- Attributes are deduplicated by (AttrID, ArrayIndices).
- For an attribute present in multiple source records, the attribute from the record with the latest UpdatedAt is chosen. Ties are resolved using preferHot and deterministic tier ordering.
- Result slice is sorted by SchemaID then RowID for deterministic output.
type PersistentRecordFederatedQuerier ¶ added in v0.0.24
type PersistentRecordFederatedQuerier interface {
// QueryPersistentRecordsFederated performs a federated query across configured data tiers
// (Postgres hot and DuckDB/S3 warm/cold). Implementations MUST preserve backwards
// compatibility for OLTP-only callers: if the federated query hints indicate hot-only
// execution, this SHOULD delegate to QueryPersistentRecords.
QueryPersistentRecordsFederated(ctx context.Context, tables StorageTables, fq *FederatedAttributeQuery, opts *FederatedQueryOptions) (*PersistentRecordPage, error)
}
type PersistentRecordKey ¶ added in v0.0.24
type PersistentRecordPage ¶
type PersistentRecordPage struct {
Records []*PersistentRecord
TotalRecords int64
TotalPages int
CurrentPage int
}
type PersistentRecordQuery ¶
type PersistentRecordQuery struct {
Tables StorageTables
SchemaID int16
Condition forma.Condition
AttributeOrders []AttributeOrder
Limit int
Offset int
}
type PersistentRecordReader ¶ added in v0.0.24
type PersistentRecordReader interface {
GetPersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) (*PersistentRecord, error)
QueryPersistentRecords(ctx context.Context, query *PersistentRecordQuery) (*PersistentRecordPage, error)
}
type PersistentRecordRepository ¶
type PersistentRecordRepository interface {
PersistentRecordWriter
PersistentRecordReader
PersistentRecordFederatedQuerier
}
type PersistentRecordTransformer ¶
type PersistentRecordTransformer interface {
ToPersistentRecord(ctx context.Context, schemaID int16, rowID uuid.UUID, jsonData any) (*PersistentRecord, error)
FromPersistentRecord(ctx context.Context, record *PersistentRecord) (map[string]any, error)
}
func NewPersistentRecordTransformer ¶
func NewPersistentRecordTransformer(registry forma.SchemaRegistry) PersistentRecordTransformer
NewPersistentRecordTransformer creates a new PersistentRecordTransformer instance
type PersistentRecordWriter ¶ added in v0.0.24
type PersistentRecordWriter interface {
InsertPersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
UpdatePersistentRecord(ctx context.Context, tables StorageTables, record *PersistentRecord) error
DeletePersistentRecord(ctx context.Context, tables StorageTables, schemaID int16, rowID uuid.UUID) error
}
type RelationDescriptor ¶ added in v0.0.18
type RelationDescriptor struct {
ChildSchema string
ChildPath string
ParentSchema string
ParentPath string
ForeignKeyAttr string
ParentIDAttr string
ForeignKeyRequired bool
}
RelationDescriptor captures how a child schema derives fields from a parent schema.
type RelationIndex ¶ added in v0.0.18
type RelationIndex struct {
// contains filtered or unexported fields
}
RelationIndex stores parent-child relations keyed by child schema name.
func LoadRelationIndex ¶ added in v0.0.18
func LoadRelationIndex(schemaDir string) (*RelationIndex, error)
LoadRelationIndex parses JSON schema files in schemaDir and builds a relation index. If the directory is missing or no relations are found, it returns an empty index.
func (*RelationIndex) Relations ¶ added in v0.0.18
func (idx *RelationIndex) Relations(schema string) []RelationDescriptor
Relations returns descriptors for a child schema.
func (*RelationIndex) StripComputedFields ¶ added in v0.0.18
StripComputedFields removes relation-backed attributes from the payload before persistence.
type RoutingDecision ¶ added in v0.0.23
type RoutingDecision struct {
Tiers []DataTier
UseDuckDB bool
Reason string
MaxScanRows int
QueryTimeout time.Duration
AllowS3Fallback bool
}
RoutingDecision indicates which tiers to query and whether to prefer DuckDB.
func EvaluateRoutingPolicy ¶ added in v0.0.23
func EvaluateRoutingPolicy(cfg forma.DuckDBConfig, fq *FederatedAttributeQuery, opts *FederatedQueryOptions) RoutingDecision
EvaluateRoutingPolicy makes a routing decision based on config, query hints and options.
type SQLGenerator ¶
type SQLGenerator struct{}
SQLGenerator converts parsed conditions into SQL fragments and argument lists.
func NewSQLGenerator ¶
func NewSQLGenerator() *SQLGenerator
NewSQLGenerator constructs a SQLGenerator.
func (*SQLGenerator) ToSQLClauses ¶ added in v0.0.24
func (g *SQLGenerator) ToSQLClauses( condition forma.Condition, eavTable string, schemaID int16, cache forma.SchemaAttributeCache, paramIndex *int, ) (string, []any, error)
ToSQLClauses builds the SQL clause and arguments for a condition tree.
func (*SQLGenerator) ToSqlClauses ¶
func (g *SQLGenerator) ToSqlClauses( condition forma.Condition, eavTable string, schemaID int16, cache forma.SchemaAttributeCache, paramIndex *int, ) (string, []any, error)
ToSqlClauses is kept for backward compatibility.
type SQLRenderer ¶ added in v0.0.23
type SQLRenderer struct {
// contains filtered or unexported fields
}
SQLRenderer renders text/template SQL templates while collecting parameter values and providing a safe identifier helper to avoid SQL injection.
func NewSQLRenderer ¶ added in v0.0.23
func NewSQLRenderer() *SQLRenderer
func (*SQLRenderer) Ident ¶ added in v0.0.23
func (r *SQLRenderer) Ident(name string) (string, error)
Ident validates a SQL identifier (table/column) and returns it quoted.
func (*SQLRenderer) Param ¶ added in v0.0.23
func (r *SQLRenderer) Param(v any) string
Param appends a value to the renderer's args and returns a "?" placeholder to be inserted into the template.
type SchemaMetadata ¶
type SchemaMetadata struct {
SchemaName string `json:"schema_name"`
SchemaID int16 `json:"schema_id"`
SchemaVersion int `json:"schema_version"`
Attributes []forma.AttributeMetadata `json:"attributes"`
}
SchemaMetadata aggregates attribute mappings for a schema version.
type Set ¶
type Set[T comparable] struct { // contains filtered or unexported fields }
Set is a generic data structure that represents a collection of unique items. It uses a map internally for O(1) operations.
func (*Set[T]) Add ¶
func (s *Set[T]) Add(item T)
Add inserts an item into the set. If the item already exists, it has no effect.
type StorageTables ¶
type Transformer ¶
type Transformer interface {
// Single object conversion
ToAttributes(ctx context.Context, schemaID int16, rowID uuid.UUID, jsonData any) ([]EntityAttribute, error)
FromAttributes(ctx context.Context, attributes []EntityAttribute) (map[string]any, error)
// Batch operations
BatchToAttributes(ctx context.Context, schemaID int16, jsonObjects []any) ([]EntityAttribute, error)
BatchFromAttributes(ctx context.Context, attributes []EntityAttribute) ([]map[string]any, error)
// Validation
ValidateAgainstSchema(ctx context.Context, jsonSchema any, jsonData any) error
}
func NewTransformer ¶
func NewTransformer(registry forma.SchemaRegistry) Transformer
NewTransformer creates a new Transformer instance backed by the provided schema registry.
Source Files
¶
- advanced_query_template.go
- advanced_query_template_duckdb.go
- attribute_converter.go
- attribute_filter.go
- base32.go
- bool_numeric.go
- circuit_breaker.go
- collections.go
- dirtyids.go
- dualpath_sql_generator.go
- dualpath_sql_helpers.go
- duckdb_conn.go
- duckdb_sql_generator.go
- duckdb_template_renderer.go
- duckdb_type_mapper.go
- entity_batch_service.go
- entity_crud_service.go
- entity_manager.go
- entity_manager_batch.go
- entity_manager_crud.go
- entity_manager_helpers.go
- entity_manager_query.go
- entity_manager_relations.go
- entity_query_service.go
- entity_relation_service.go
- federated_interfaces.go
- federated_merge.go
- federated_pagination.go
- federated_routing.go
- file_schema_registry.go
- interfaces.go
- metadata_loader.go
- metadata_parser.go
- metadata_types.go
- numeric_conversion.go
- persistent_transformer.go
- postgres_condition_helpers.go
- postgres_duckdb_query.go
- postgres_duckdb_query_helpers.go
- postgres_health.go
- postgres_persistent_repository.go
- postgres_persistent_repository_batch.go
- postgres_persistent_repository_columns.go
- postgres_persistent_repository_eav.go
- postgres_persistent_repository_main_table.go
- postgres_persistent_repository_query.go
- postgres_row_scanner.go
- relation_index.go
- s3_health.go
- schema_metadata_cache.go
- schema_parser.go
- sql_generator.go
- sql_helpers.go
- sql_template_renderer.go
- telemetry.go
- transformer.go
- transformer_array.go
- types.go
- utils.go