Documentation
¶
Index ¶
- Constants
- Variables
- func DenormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{})
- func GenerateAdaptedSchemasTableSQL(dialect StorageDialect) string
- func GenerateCreateTableSQL(spec *AdaptedTableSpec, dialect StorageDialect) string
- func GenerateIndexSQL(spec *AdaptedTableSpec, dialect StorageDialect) []string
- func ListStores() []string
- func MigrateAdaptedTable(ctx context.Context, db *sql.DB, registry *AdaptedRegistry, entity string, ...) error
- func NormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{}) error
- func PartitionData(spec *AdaptedTableSpec, data map[string]interface{}) (columnValues []interface{}, extra map[string]interface{})
- func ReassembleData(spec *AdaptedTableSpec, columnValues []interface{}, ...) map[string]interface{}
- func RegisterAdaptedTable(ctx context.Context, db *sql.DB, registry *AdaptedRegistry, entity string, ...) error
- func RegisterStore(name string, factory StoreFactory)
- type AdaptedRegistry
- type AdaptedTableSpec
- type AdaptiveLock
- func (al *AdaptiveLock) Engaged() bool
- func (al *AdaptiveLock) Lock() bool
- func (al *AdaptiveLock) RLock() bool
- func (al *AdaptiveLock) RUnlock()
- func (al *AdaptiveLock) RecordFailure()
- func (al *AdaptiveLock) RecordSuccess()
- func (al *AdaptiveLock) SetThreshold(threshold int)
- func (al *AdaptiveLock) Stop()
- func (al *AdaptiveLock) Threshold() int
- func (al *AdaptiveLock) Unlock()
- type AggregateQueryable
- type Batcher
- type ColumnChange
- type ColumnDef
- type CommitAppend
- type CommitAppendResult
- type CommitRequest
- type CommitResult
- type CommitUpdate
- type CommitUpdateResult
- type EntityLister
- type FieldIntrospector
- type FieldQueryable
- type FilterableStore
- type GraphEdge
- type GraphEdgeScanner
- type GraphIntegrity
- type GraphNeighbors
- type IDGenerator
- type IndexDef
- type InfoProvider
- type JSONFileStore
- func (s *JSONFileStore) Close() error
- func (s *JSONFileStore) Commit(_ context.Context, _ CommitRequest) (CommitResult, error)
- func (s *JSONFileStore) Config() StoreConfig
- func (s *JSONFileStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
- func (s *JSONFileStore) Delete(ctx context.Context, entity string, id int) error
- func (s *JSONFileStore) Exists(ctx context.Context, entity string, id int) bool
- func (s *JSONFileStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)
- func (s *JSONFileStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
- func (s *JSONFileStore) GetEntityDir(entity string) string
- func (s *JSONFileStore) Info() StoreInfo
- func (s *JSONFileStore) List(ctx context.Context, entity string) ([]map[string]interface{}, error)
- func (s *JSONFileStore) ListEntities(ctx context.Context) ([]string, error)
- func (s *JSONFileStore) NextID(ctx context.Context, entity string) (int, error)
- func (s *JSONFileStore) Patch(ctx context.Context, entity string, id int, patchData map[string]interface{}) error
- func (s *JSONFileStore) PatchValidated(ctx context.Context, entity string, id int, patchData map[string]interface{}, ...) error
- func (s *JSONFileStore) Ping(ctx context.Context) error
- func (s *JSONFileStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)
- func (s *JSONFileStore) Search(ctx context.Context, entity string, field string, query string, ...) ([]map[string]interface{}, error)
- func (s *JSONFileStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
- type Migrator
- type PagedLister
- type PagedResult
- type QueryCapabilities
- type Queryable
- type SQLiteConfig
- type SQLiteStorageDialect
- func (d *SQLiteStorageDialect) ColumnType(jsonType, format string, precision, scale int) string
- func (d *SQLiteStorageDialect) CreateIndexSQL(spec *AdaptedTableSpec) []string
- func (d *SQLiteStorageDialect) CreateTableSQL(spec *AdaptedTableSpec) string
- func (d *SQLiteStorageDialect) DeleteSQL(spec *AdaptedTableSpec) string
- func (d *SQLiteStorageDialect) DenormaliseDecimal(value string, precision, scale int) string
- func (d *SQLiteStorageDialect) ExistsSQL(spec *AdaptedTableSpec) string
- func (d *SQLiteStorageDialect) InsertSQL(spec *AdaptedTableSpec, hasExtra bool) (string, []string)
- func (d *SQLiteStorageDialect) MetadataTableSQL() string
- func (d *SQLiteStorageDialect) Name() string
- func (d *SQLiteStorageDialect) NormaliseDecimal(value string, precision, scale int) (string, error)
- func (d *SQLiteStorageDialect) Placeholder(_ int) string
- func (d *SQLiteStorageDialect) SelectAllSQL(spec *AdaptedTableSpec) string
- func (d *SQLiteStorageDialect) SelectSQL(spec *AdaptedTableSpec) string
- func (d *SQLiteStorageDialect) SupportsNativeDecimalAggregation() bool
- func (d *SQLiteStorageDialect) UpdateSQL(spec *AdaptedTableSpec, versionCheck bool) string
- type SQLiteStore
- func (s *SQLiteStore) AdaptedColumnInfo(entity, jsonField string) (colName string, scale int, isDecimal bool, ok bool)
- func (s *SQLiteStore) AdaptedRegistry() *AdaptedRegistry
- func (s *SQLiteStore) AdaptedTableName(entity string) (string, bool)
- func (s *SQLiteStore) AggregateQuery(ctx context.Context, sql string, args []interface{}, aliases []string) ([]map[string]interface{}, error)
- func (s *SQLiteStore) Capabilities() QueryCapabilities
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) Commit(ctx context.Context, req CommitRequest) (CommitResult, error)
- func (s *SQLiteStore) Config() StoreConfig
- func (s *SQLiteStore) ContentionLock() *AdaptiveLock
- func (s *SQLiteStore) CountEntities(ctx context.Context, entity string) (int, error)
- func (s *SQLiteStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
- func (s *SQLiteStore) DB() *sql.DB
- func (s *SQLiteStore) Delete(ctx context.Context, entity string, id int) error
- func (s *SQLiteStore) Exists(ctx context.Context, entity string, id int) bool
- func (s *SQLiteStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)
- func (s *SQLiteStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
- func (s *SQLiteStore) GraphTenantIDs(ctx context.Context) ([]uint16, error)
- func (s *SQLiteStore) Info() StoreInfo
- func (s *SQLiteStore) IsAdaptedEntity(entity string) bool
- func (s *SQLiteStore) List(ctx context.Context, entity string) ([]map[string]interface{}, error)
- func (s *SQLiteStore) ListEntities(ctx context.Context) ([]string, error)
- func (s *SQLiteStore) ListPaged(ctx context.Context, entity string, limit, offset int) (*PagedResult, error)
- func (s *SQLiteStore) ListWithFields(ctx context.Context, entity string, fields []string) ([]map[string]interface{}, error)
- func (s *SQLiteStore) ListWithFieldsAndFilter(ctx context.Context, entity string, fields []string, ...) ([]map[string]interface{}, error)
- func (s *SQLiteStore) Patch(ctx context.Context, entity string, id int, updates map[string]interface{}) error
- func (s *SQLiteStore) PatchValidated(ctx context.Context, entity string, id int, updates map[string]interface{}, ...) error
- func (s *SQLiteStore) Ping(ctx context.Context) error
- func (s *SQLiteStore) QueryWithFields(ctx context.Context, sqlQuery string, args []interface{}, fields []string) ([]map[string]interface{}, error)
- func (s *SQLiteStore) QueryWithPlan(ctx context.Context, sqlQuery string, args []interface{}) ([]map[string]interface{}, error)
- func (s *SQLiteStore) ReaderDB() *sql.DB
- func (s *SQLiteStore) RebuildGraph(ctx context.Context) error
- func (s *SQLiteStore) RegisterAdaptedEntity(ctx context.Context, entity string, schema map[string]interface{}) error
- func (s *SQLiteStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)
- func (s *SQLiteStore) ScanGraphEdges(ctx context.Context, tenantID uint16, fn func(GraphEdge) error) error
- func (s *SQLiteStore) Search(ctx context.Context, entity string, field string, query string, ...) ([]map[string]interface{}, error)
- func (s *SQLiteStore) StorageDialectFor(entity string) StorageDialect
- func (s *SQLiteStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
- func (s *SQLiteStore) VerifyGraphIntegrity(ctx context.Context) error
- func (s *SQLiteStore) WithLogger(logger zerolog.Logger) *SQLiteStore
- type SQLiteTenantPersister
- type SchemaBrowser
- type SchemaDiff
- type SchemaIntrospector
- type Searcher
- type StmtCache
- type StorageDialect
- type Store
- type StoreConfig
- type StoreFactory
- type StoreInfo
- type TenantIDLister
Constants ¶
const DefaultStmtCacheSize = 256
DefaultStmtCacheSize is the maximum number of prepared statements kept in the cache before LRU eviction. Each entry holds a *sql.Stmt against the reader pool. For a typical olu workload the number of distinct SQL shapes is bounded (tens, not thousands), so 256 is generous while keeping memory bounded.
Variables ¶
var ( // ErrNotFound is returned when an entity is not found ErrNotFound = errors.New("entity not found") // ErrAlreadyExists is returned when an entity already exists ErrAlreadyExists = errors.New("entity already exists") // ErrInvalidEntity is returned when entity name is invalid ErrInvalidEntity = errors.New("invalid entity name") // ErrInvalidID is returned when ID is invalid ErrInvalidID = errors.New("invalid ID") // ErrConflict is returned when an optimistic concurrency check fails ErrConflict = errors.New("version conflict") // ErrNotSupported is returned when a storage backend does not implement // a particular operation. Handlers that receive this error should map it // to an appropriate HTTP 501 Not Implemented response. ErrNotSupported = errors.New("operation not supported by this storage backend") )
Functions ¶
func DenormaliseDecimalColumns ¶
func DenormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{})
DenormaliseDecimalColumns applies dialect-specific decimal denormalisation to column values read from the database. This converts scaled integers back to client-facing decimal strings. Called on the read path (get, list) after SQL scan.
func GenerateAdaptedSchemasTableSQL ¶
func GenerateAdaptedSchemasTableSQL(dialect StorageDialect) string
GenerateAdaptedSchemasTableSQL returns the DDL for the metadata table that tracks adapted table schemas, using the given dialect.
func GenerateCreateTableSQL ¶
func GenerateCreateTableSQL(spec *AdaptedTableSpec, dialect StorageDialect) string
GenerateCreateTableSQL produces the CREATE TABLE statement for an adapted table using the given dialect.
func GenerateIndexSQL ¶
func GenerateIndexSQL(spec *AdaptedTableSpec, dialect StorageDialect) []string
GenerateIndexSQL produces CREATE INDEX statements for the adapted table using the given dialect.
func MigrateAdaptedTable ¶
func MigrateAdaptedTable( ctx context.Context, db *sql.DB, registry *AdaptedRegistry, entity string, newSchema map[string]interface{}, dialect StorageDialect, ) error
MigrateAdaptedTable applies schema changes to an existing adapted table. It computes the diff between the stored spec and the new schema, then:
- Rejects type changes (incompatible, require manual intervention)
- Adds new columns via ALTER TABLE ADD COLUMN
- Drops removed columns via ALTER TABLE DROP COLUMN (SQLite 3.35+), after migrating any existing data to the _extra overflow column
- Updates indexes (drop old, create new)
- Updates the metadata row in adapted_table_schemas
- Updates the in-memory registry
The entire migration runs in a single transaction.
func NormaliseDecimalColumns ¶
func NormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{}) error
NormaliseDecimalColumns applies dialect-specific decimal normalisation to column values produced by PartitionData. This transforms validated decimal strings into scaled integers for storage. Called on the write path (create, update) before SQL execution.
func PartitionData ¶
func PartitionData(spec *AdaptedTableSpec, data map[string]interface{}) (columnValues []interface{}, extra map[string]interface{})
PartitionData separates a data map into schema-column values and overflow. Returns:
- columnValues: ordered values matching spec.Columns, ready for INSERT
- extra: map of fields not in the schema (nil if none or !hasExtra)
REF fields are decomposed: {"type":"REF","entity":"users","id":42} becomes two column values: "users" (for REF_{field}_entity) and 42 (for REF_{field}_id).
func ReassembleData ¶
func ReassembleData(spec *AdaptedTableSpec, columnValues []interface{}, extra map[string]interface{}, id int, version int) map[string]interface{}
ReassembleData reconstructs a map[string]interface{} from column values and an optional overflow map. This is the inverse of PartitionData.
func RegisterAdaptedTable ¶
func RegisterAdaptedTable(ctx context.Context, db *sql.DB, registry *AdaptedRegistry, entity string, schema map[string]interface{}, dialect StorageDialect) error
RegisterAdaptedTable derives a table spec from a JSON Schema, creates the table and indexes in the database, and records the spec in the adapted_table_schemas metadata table.
If the table already exists with the same schema hash, this is a no-op. If the schema has changed, the caller must handle migration separately (Phase 4 of the design).
func RegisterStore ¶
func RegisterStore(name string, factory StoreFactory)
RegisterStore registers a new store implementation
Types ¶
type AdaptedRegistry ¶
type AdaptedRegistry struct {
// contains filtered or unexported fields
}
AdaptedRegistry tracks which entity types have adapted tables.
func LoadAdaptedRegistry ¶
LoadAdaptedRegistry reads the adapted_table_schemas metadata table and populates the registry. Called at store startup.
func NewAdaptedRegistry ¶
func NewAdaptedRegistry() *AdaptedRegistry
NewAdaptedRegistry creates an empty registry.
func (*AdaptedRegistry) Entities ¶
func (r *AdaptedRegistry) Entities() []string
Entities returns a sorted list of all adapted entity types.
func (*AdaptedRegistry) Get ¶
func (r *AdaptedRegistry) Get(entity string) *AdaptedTableSpec
Get returns the adapted table spec for an entity, or nil if the entity uses blob storage.
func (*AdaptedRegistry) IsAdapted ¶
func (r *AdaptedRegistry) IsAdapted(entity string) bool
IsAdapted reports whether an entity type has an adapted table.
func (*AdaptedRegistry) Set ¶
func (r *AdaptedRegistry) Set(entity string, spec *AdaptedTableSpec)
Set registers an adapted table spec for an entity.
type AdaptedTableSpec ¶
type AdaptedTableSpec struct {
Entity string `json:"entity"` // Entity type name
Columns []ColumnDef `json:"columns"` // Ordered column definitions
SchemaHash string `json:"schema_hash"` // SHA-256 of canonical schema JSON
HasExtra bool `json:"has_extra"` // Whether _extra overflow column is present
Indexes []IndexDef `json:"indexes"` // Indexes to create
}
AdaptedTableSpec describes the full column layout of an adapted table.
func DeriveAdaptedTableSpec ¶
func DeriveAdaptedTableSpec(entity string, schema map[string]interface{}, dialect StorageDialect) (*AdaptedTableSpec, error)
DeriveAdaptedTableSpec examines a JSON Schema document and produces a complete AdaptedTableSpec describing the adapted table layout.
The dialect parameter determines backend-specific column types.
This is a convenience wrapper that creates a SchemaIntrospector from the raw JSON Schema map. For direct use with queryfy (future), call DeriveAdaptedTableSpecFrom with a queryfy-backed introspector.
func DeriveAdaptedTableSpecFrom ¶
func DeriveAdaptedTableSpecFrom(entity string, schema SchemaIntrospector, dialect StorageDialect, schemaHash string) (*AdaptedTableSpec, error)
DeriveAdaptedTableSpecFrom derives an AdaptedTableSpec from a SchemaIntrospector. This is the backend-agnostic core that works with any schema representation (JSON Schema maps, queryfy objects, or anything else that implements SchemaIntrospector).
func (*AdaptedTableSpec) ColumnByName ¶
func (s *AdaptedTableSpec) ColumnByName(name string) (ColumnDef, bool)
ColumnByName returns the ColumnDef for a given SQL column name.
func (*AdaptedTableSpec) ColumnNames ¶
func (s *AdaptedTableSpec) ColumnNames() []string
ColumnNames returns all column names in order (excluding system columns).
func (*AdaptedTableSpec) FieldToColumn ¶
func (s *AdaptedTableSpec) FieldToColumn(jsonField string) []string
FieldToColumn maps a JSON field name to its column name(s). For REF fields, this returns two names: REF_{field}_entity, REF_{field}_id. For all other fields, it returns a single name equal to the field name.
func (*AdaptedTableSpec) IsSchemaField ¶
func (s *AdaptedTableSpec) IsSchemaField(jsonField string) bool
IsSchemaField reports whether a JSON field name is a declared schema field.
func (*AdaptedTableSpec) TableName ¶
func (s *AdaptedTableSpec) TableName() string
TableName returns the SQL table name for this adapted table.
type AdaptiveLock ¶
type AdaptiveLock struct {
// contains filtered or unexported fields
}
AdaptiveLock provides a mutex that automatically engages under contention.
Under normal load, operations proceed without any Go-side locking, relying on SQLite's WAL mode and busy_timeout for concurrency control. When the write success rate drops below a configurable threshold (indicating heavy contention), the lock engages a sync.RWMutex to serialise writes and eliminate SQLITE_BUSY errors entirely. When contention subsides, the lock disengages and returns to lock-free operation.
This gives the best of both worlds: full WAL concurrency under normal load, and guaranteed zero errors under burst load.
func NewAdaptiveLock ¶
func NewAdaptiveLock(threshold int) *AdaptiveLock
NewAdaptiveLock creates an adaptive lock with the given success-rate threshold. threshold is expressed as a percentage (e.g. 95 means engage the mutex when the success rate drops below 95%). Valid range: 0-100. 0 disables the lock entirely; 100 keeps it permanently engaged (equivalent to a plain mutex).
func (*AdaptiveLock) Engaged ¶
func (al *AdaptiveLock) Engaged() bool
Engaged returns whether the mutex is currently engaged.
func (*AdaptiveLock) Lock ¶
func (al *AdaptiveLock) Lock() bool
Lock acquires a write lock if the adaptive lock is engaged. Returns true if the lock was acquired (caller must call Unlock).
func (*AdaptiveLock) RLock ¶
func (al *AdaptiveLock) RLock() bool
RLock acquires a read lock if the adaptive lock is engaged. Returns true if the lock was acquired (caller must call RUnlock).
func (*AdaptiveLock) RUnlock ¶
func (al *AdaptiveLock) RUnlock()
RUnlock releases the read lock. Only call if RLock returned true.
func (*AdaptiveLock) RecordFailure ¶
func (al *AdaptiveLock) RecordFailure()
RecordFailure records a failed (SQLITE_BUSY) operation. If the threshold is non-zero, this immediately engages the lock — the burst is happening now and waiting for the monitor tick would let more failures through. The monitor goroutine handles disengagement once the window is clean.
func (*AdaptiveLock) RecordSuccess ¶
func (al *AdaptiveLock) RecordSuccess()
RecordSuccess records a successful operation.
func (*AdaptiveLock) SetThreshold ¶
func (al *AdaptiveLock) SetThreshold(threshold int)
SetThreshold dynamically updates the success-rate threshold at runtime. threshold is a percentage (0-100). Takes effect on the next monitor tick.
func (*AdaptiveLock) Stop ¶
func (al *AdaptiveLock) Stop()
Stop terminates the background monitor goroutine.
func (*AdaptiveLock) Threshold ¶
func (al *AdaptiveLock) Threshold() int
Threshold returns the current threshold as a percentage (0-100).
func (*AdaptiveLock) Unlock ¶
func (al *AdaptiveLock) Unlock()
Unlock releases the write lock. Only call if Lock returned true.
type AggregateQueryable ¶
type AggregateQueryable interface {
// AggregateQuery executes a pre-built aggregate SQL query and returns
// the grouped results as maps. Unlike QueryWithPlan (which scans
// data+_version blobs), this scans arbitrary columns/expressions
// and returns them by alias.
AggregateQuery(ctx context.Context, sql string, args []interface{}, aliases []string) ([]map[string]interface{}, error)
// IsAdaptedEntity reports whether the given entity uses an adapted
// table (column-per-field storage). The OQL planner uses this to
// decide whether aggregate push-down is possible.
IsAdaptedEntity(entity string) bool
// AdaptedColumnInfo returns the SQL column name for a JSON field in
// an adapted entity. Returns ("", false) if the entity is not adapted
// or the field is not a known column. For decimal columns, also returns
// the scale so the caller can denormalise aggregated values.
AdaptedColumnInfo(entity, jsonField string) (colName string, scale int, isDecimal bool, ok bool)
// AdaptedTableName returns the SQL table name for an adapted entity.
// Returns ("", false) if the entity is not adapted.
AdaptedTableName(entity string) (string, bool)
// StorageDialectFor returns the StorageDialect for the given entity,
// or nil if the entity is not adapted. This allows SQL generators to
// access dialect methods without importing backend-specific types.
StorageDialectFor(entity string) StorageDialect
}
AggregateQueryable is an optional interface for storage backends that support native GROUP BY + aggregate push-down for adapted tables.
When an entity has an adapted table, aggregate queries can run entirely in SQL against native columns instead of fetching all rows into Go. The OQL executor checks for this interface via type assertion.
type Batcher ¶
type Batcher interface {
BatchCreate(ctx context.Context, entity string, items []map[string]interface{}) ([]int, error)
BatchDelete(ctx context.Context, entity string, ids []int) error
}
Batcher defines optional batch operation support
type ColumnChange ¶
type ColumnChange struct {
Name string
OldSQLType string
NewSQLType string
OldType string
NewType string
}
ColumnChange records an incompatible type change for a column.
type ColumnDef ¶
type ColumnDef struct {
Name string `json:"name"` // Column name (e.g., "age", "REF_author_entity")
JSONField string `json:"json_field"` // Original JSON field name (e.g., "age", "author")
Type string `json:"type"` // JSON Schema type: string, integer, number, boolean, array, object
Format string `json:"format"` // JSON Schema format: "", "decimal", "ref", "email", etc.
SQLType string `json:"sql_type"` // Backend-specific SQL type (e.g., "TEXT", "INTEGER", "REAL")
Required bool `json:"required"` // Whether the field is in the schema's required array
Precision int `json:"precision"` // For decimal: total significant digits
Scale int `json:"scale"` // For decimal: digits after decimal point
IsREF bool `json:"is_ref"` // True if this column is part of a REF decomposition
}
ColumnDef describes a single column in an adapted table.
type CommitAppend ¶
type CommitAppend struct {
Entity string `json:"entity"`
ID *int `json:"id,omitempty"`
Data map[string]interface{} `json:"data"`
}
CommitAppend describes one record to insert in a Commit operation. If ID is nil, the backend auto-generates an ID. If ID is non-nil and a record with that ID already exists in the entity type, ErrAlreadyExists is returned and the entire commit is rolled back.
type CommitAppendResult ¶
CommitAppendResult describes one inserted record in a Commit response. ID is always set; for auto-generated IDs it contains the assigned value.
type CommitRequest ¶
type CommitRequest struct {
Update CommitUpdate `json:"update"`
Append []CommitAppend `json:"append"`
}
CommitRequest is the payload for the atomic commit endpoint. It performs one conditional upsert (Update) and one or more unconditional inserts (Append) in a single storage transaction.
type CommitResult ¶
type CommitResult struct {
Update CommitUpdateResult `json:"update"`
Appended []CommitAppendResult `json:"appended"`
}
CommitResult is returned on a successful Commit.
type CommitUpdate ¶
type CommitUpdate struct {
Entity string `json:"entity"`
ID int `json:"id"`
Version *int `json:"version,omitempty"`
Data map[string]interface{} `json:"data"`
}
CommitUpdate describes the entity to upsert in a Commit operation. If Version is non-nil, the write is conditional: it proceeds only if the stored _version equals *Version. A mismatch returns ErrConflict.
type CommitUpdateResult ¶
type CommitUpdateResult struct {
Entity string `json:"entity"`
ID int `json:"id"`
Created bool `json:"created"`
Version int `json:"version"`
}
CommitUpdateResult describes the outcome of the upsert in a Commit. Created is true when a new record was inserted; false when an existing record was overwritten. Version is the _version value after the commit.
type EntityLister ¶
EntityLister defines optional entity type listing support
type FieldIntrospector ¶
type FieldIntrospector interface {
// JSONType returns the JSON Schema type string:
// "string", "integer", "number", "boolean", "array", "object"
JSONType() string
// Format returns the declared format ("email", "decimal", "ref", "")
Format() string
// EnumValues returns the declared enum values, or nil if none.
EnumValues() []string
// Meta returns a metadata value by key (e.g., "decimalPrecision").
// Returns (nil, false) if the key is not set.
Meta(key string) (interface{}, bool)
}
FieldIntrospector provides read access to a single field's type and constraints. This maps to queryfy's per-type schema introspection (StringSchema.FormatType, NumberSchema.RangeConstraints, etc.).
type FieldQueryable ¶
type FieldQueryable interface {
// ListWithFields returns all records for an entity type, but each
// record contains only the specified fields plus _version. Fields
// not present in a particular record's JSON are omitted from the
// returned map (no null padding).
//
// For adapted entities this falls through to the regular List path
// (adapted tables already select native columns efficiently).
ListWithFields(ctx context.Context, entity string, fields []string) ([]map[string]interface{}, error)
// QueryWithFields executes a pre-built SQL query (WHERE push-down)
// and returns results with only the specified fields extracted from
// the data blob. Like QueryWithPlan but avoids full deserialisation.
QueryWithFields(ctx context.Context, sqlQuery string, args []interface{}, fields []string) ([]map[string]interface{}, error)
}
FieldQueryable is an optional interface for storage backends that support selective field extraction from blob entities.
Instead of deserialising every JSON blob into a full map, a FieldQueryable backend can extract only the requested fields during the scan loop. For blob entities this avoids allocating maps with dozens of unused keys.
The OQL executor checks for this interface via type assertion. If the store satisfies FieldQueryable and the query's SELECT list names specific fields (not SELECT *), the executor may call ListWithFields instead of List.
type FilterableStore ¶
type FilterableStore interface {
FieldQueryable
// ListWithFieldsAndFilter returns records for an entity type,
// extracting only the specified fields and applying the predicate
// set during tokenisation. Rows that fail the predicates are never
// materialised as maps.
//
// The caller must ensure that fields includes all columns needed
// for the SELECT list. Predicate fields may or may not overlap
// with output fields.
ListWithFieldsAndFilter(ctx context.Context, entity string, fields []string, preds *jsonic.PredicateSet) ([]map[string]interface{}, error)
}
FilterableStore is an optional extension of FieldQueryable that supports predicate evaluation during JSON tokenisation (B4 push-down).
Instead of extracting all rows and filtering in Go afterward, a FilterableStore evaluates simple predicates inline during the token walk, skipping map allocation for rows that don't match.
The OQL executor checks for this interface via type assertion. If the store satisfies FilterableStore and the WHERE clause can be expressed as a jsonic.PredicateSet, the executor passes predicates down.
type GraphEdge ¶
type GraphEdge struct {
SourceEntity string
SourceID int
TargetEntity string
TargetID int
Relationship string
}
GraphEdge holds the five columns of one row from the graph edges table.
type GraphEdgeScanner ¶
type GraphEdgeScanner interface {
ScanGraphEdges(ctx context.Context, tenantID uint16, fn func(GraphEdge) error) error
}
GraphEdgeScanner is an optional interface for storage backends that can stream graph edges directly from their edge table without deserialising full entity JSON. Implementing this interface enables O(edges) startup graph hydration instead of O(entities × JSON size).
ScanGraphEdges calls fn once per edge row. Iteration stops on the first non-nil error returned by fn. A nil error from ScanGraphEdges means all rows were scanned (or fn stopped iteration early with a sentinel — callers must define their own sentinel if needed).
tenantID scopes the scan to a specific tenant's edge table. Pass 0 for the default (tenant-0) table. Future SQL backends may extend this to scan all tenant tables in a single call; the current SQLite implementation scans one tenant at a time, matching the existing startup scope.
type GraphIntegrity ¶
type GraphIntegrity interface {
VerifyGraphIntegrity(ctx context.Context) error
RebuildGraph(ctx context.Context) error
}
GraphIntegrity defines optional graph integrity checking
type GraphNeighbors ¶
type GraphNeighbors interface {
}
GraphNeighbors defines optional graph neighbor queries
type IDGenerator ¶
IDGenerator defines interface for ID generation strategies
type IndexDef ¶
type IndexDef struct {
Name string `json:"name"` // Index name
Columns []string `json:"columns"` // Column names
Unique bool `json:"unique"` // Whether the index is unique
}
IndexDef describes an index on an adapted table.
type InfoProvider ¶
type InfoProvider interface {
Info() StoreInfo
}
InfoProvider allows stores to provide metadata about their capabilities
type JSONFileStore ¶
type JSONFileStore struct {
// contains filtered or unexported fields
}
JSONFileStore implements Store interface using JSON files
func NewJSONFileStore ¶
func NewJSONFileStore(baseDir, schema string) (*JSONFileStore, error)
NewJSONFileStore creates a new JSON file-based storage
func (*JSONFileStore) Close ¶
func (s *JSONFileStore) Close() error
Close closes the storage (cleanup if needed)
func (*JSONFileStore) Commit ¶
func (s *JSONFileStore) Commit(_ context.Context, _ CommitRequest) (CommitResult, error)
Commit performs an atomic-as-possible upsert + append sequence on the jsonfile backend. True atomicity is not available in the file system; this implementation acquires per-entity ID locks in sorted order to prevent races and rolls back completed writes on failure.
Commit is not supported by the jsonfile backend. The jsonfile backend does not provide true transactional atomicity and has been deprecated for production use. This method exists solely to satisfy the Store interface; callers will receive ErrNotSupported, which the HTTP handler maps to 501 Not Implemented (OLU-CM009).
func (*JSONFileStore) Config ¶
func (s *JSONFileStore) Config() StoreConfig
Config returns the store's StoreConfig.
func (*JSONFileStore) Create ¶
func (s *JSONFileStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
Create creates a new entity with auto-generated ID
func (*JSONFileStore) FullTextSearch ¶
func (s *JSONFileStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)
FullTextSearch is not supported for JSONFileStore, returns empty results
func (*JSONFileStore) Get ¶
func (s *JSONFileStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
Get retrieves an entity by ID
func (*JSONFileStore) GetEntityDir ¶
func (s *JSONFileStore) GetEntityDir(entity string) string
GetEntityDir returns the directory path for an entity
func (*JSONFileStore) Info ¶
func (s *JSONFileStore) Info() StoreInfo
Info returns store information
func (*JSONFileStore) ListEntities ¶
func (s *JSONFileStore) ListEntities(ctx context.Context) ([]string, error)
ListEntities returns all entity types in the schema
func (*JSONFileStore) Patch ¶
func (s *JSONFileStore) Patch(ctx context.Context, entity string, id int, patchData map[string]interface{}) error
Patch partially updates an entity
func (*JSONFileStore) PatchValidated ¶
func (s *JSONFileStore) PatchValidated(ctx context.Context, entity string, id int, patchData map[string]interface{}, validate func(merged map[string]interface{}) error) error
PatchValidated merges patch data and optionally validates the merged result. Note: JSONFileStore does not provide transactional isolation for Patch — concurrent patches may still race. For production multi-tenant use, use SQLite.
func (*JSONFileStore) Ping ¶
func (s *JSONFileStore) Ping(ctx context.Context) error
Ping verifies that the storage directory is accessible.
func (*JSONFileStore) Save ¶
func (s *JSONFileStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)
Save upserts an entity with the caller-specified ID. Returns (true, nil) when a new file was created, (false, nil) when an existing file was overwritten.
Optimistic concurrency: if data contains "_version", the overwrite path is conditional. If the stored _version differs from the expected value, Save returns ErrConflict without writing. On a successful overwrite, _version is incremented. The create path always starts at _version = 1.
func (*JSONFileStore) Search ¶
func (s *JSONFileStore) Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)
Search implements field-based search
func (*JSONFileStore) Update ¶
func (s *JSONFileStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
Update replaces an entity completely. Optimistic concurrency: if data contains "_version", the write is conditional on the stored version matching. Returns ErrConflict on mismatch, ErrNotFound if the entity does not exist.
type Migrator ¶
type Migrator interface {
Migrate(ctx context.Context) error
Version(ctx context.Context) (int, error)
}
Migrator defines optional schema migration support Useful for database backends
type PagedLister ¶
type PagedLister interface {
// ListPaged returns a single page of entities, plus the total count.
// limit and offset are applied at the storage layer (SQL LIMIT/OFFSET).
ListPaged(ctx context.Context, entity string, limit, offset int) (*PagedResult, error)
}
PagedLister is an optional interface for storage backends that support server-side pagination. Backends that implement this avoid loading every record into memory for paginated list requests.
type PagedResult ¶
PagedResult holds a page of results plus the total count.
type QueryCapabilities ¶
type QueryCapabilities struct {
Where bool // Can filter with json_extract predicates
OrderBy bool // Can sort by json_extract fields
Limit bool // Can apply TOP/LIMIT
Count bool // Can return entity count without full scan
}
QueryCapabilities reports what a storage backend can handle natively via predicate push-down. Used by the OQL planner to decide which operations to delegate to the storage engine versus executing in Go.
type Queryable ¶
type Queryable interface {
// Capabilities reports which query operations this backend handles
// natively. The planner will not attempt to push down an operation
// unless the corresponding capability is true.
Capabilities() QueryCapabilities
// CountEntities returns the number of records for an entity type
// without fetching the records themselves. Used by the planner to
// determine whether push-down is worthwhile (the fixed overhead of
// a push-down query exceeds Go-side cost for small datasets).
CountEntities(ctx context.Context, entity string) (int, error)
// QueryWithPlan executes a pre-built SQL query with parameterised
// arguments and returns the results as maps, in the same format as
// List(). The SQL is generated by the OQL planner's SQL generator
// and always selects from the data column with json_extract().
QueryWithPlan(ctx context.Context, sql string, args []interface{}) ([]map[string]interface{}, error)
}
Queryable is an optional interface for storage backends that support predicate push-down. Backends that do not implement it receive the full Go-side execution path for all operations.
The OQL planner checks for this interface via type assertion. If the store satisfies Queryable and the entity cardinality exceeds the push-down threshold, the planner may generate SQL to delegate WHERE, ORDER BY, and LIMIT operations to the storage engine.
type SQLiteConfig ¶
type SQLiteConfig struct {
DBPath string
EnableWAL bool // Write-Ahead Logging for better concurrency
EnableForeignKeys bool
CacheSize int // Page cache size in KB
BusyTimeout int // Milliseconds to wait on locked database
FullTextEnabled bool // Enable FTS5 full-text search indexing
GraphEnabled bool // Enable graph edge table maintenance
TenantID uint16 // 0 = no tenant scoping
// Performance tuning (zero = use backend defaults)
// SQLite defaults: MaxOpenConns=1 (WAL single-writer),
// MaxIdleConns=1, ReadPoolSize=NumCPU.
MaxOpenConns int // Max open write connections (0 = backend default)
MaxIdleConns int // Max idle write connections (0 = backend default)
ReadPoolSize int // Max open read connections (0 = backend default)
ContentionThreshold int // Adaptive lock threshold 0-100 (default 95)
}
SQLiteConfig holds SQLite-specific configuration
type SQLiteStorageDialect ¶
type SQLiteStorageDialect struct{}
SQLiteStorageDialect implements StorageDialect for SQLite.
func (*SQLiteStorageDialect) ColumnType ¶
func (d *SQLiteStorageDialect) ColumnType(jsonType, format string, precision, scale int) string
func (*SQLiteStorageDialect) CreateIndexSQL ¶
func (d *SQLiteStorageDialect) CreateIndexSQL(spec *AdaptedTableSpec) []string
func (*SQLiteStorageDialect) CreateTableSQL ¶
func (d *SQLiteStorageDialect) CreateTableSQL(spec *AdaptedTableSpec) string
func (*SQLiteStorageDialect) DeleteSQL ¶
func (d *SQLiteStorageDialect) DeleteSQL(spec *AdaptedTableSpec) string
func (*SQLiteStorageDialect) DenormaliseDecimal ¶
func (d *SQLiteStorageDialect) DenormaliseDecimal(value string, precision, scale int) string
DenormaliseDecimal converts a scaled int64 string back to a client-facing decimal string by dividing by 10^scale.
"1990" with scale=2 → "19.90" "-1990" with scale=2 → "-19.90" "0" with scale=2 → "0.00"
func (*SQLiteStorageDialect) ExistsSQL ¶
func (d *SQLiteStorageDialect) ExistsSQL(spec *AdaptedTableSpec) string
func (*SQLiteStorageDialect) InsertSQL ¶
func (d *SQLiteStorageDialect) InsertSQL(spec *AdaptedTableSpec, hasExtra bool) (string, []string)
func (*SQLiteStorageDialect) MetadataTableSQL ¶
func (d *SQLiteStorageDialect) MetadataTableSQL() string
func (*SQLiteStorageDialect) Name ¶
func (d *SQLiteStorageDialect) Name() string
func (*SQLiteStorageDialect) NormaliseDecimal ¶
func (d *SQLiteStorageDialect) NormaliseDecimal(value string, precision, scale int) (string, error)
NormaliseDecimal converts a decimal string to a scaled int64 string for SQLite INTEGER storage. The value is multiplied by 10^scale.
Examples for precision=6, scale=2:
"19.90" → "1990" "-19.90" → "-1990" "0" → "0" "9999.99" → "999999" "-0.01" → "-1"
The scaled integer fits in int64 for precision up to 18.
func (*SQLiteStorageDialect) Placeholder ¶
func (d *SQLiteStorageDialect) Placeholder(_ int) string
func (*SQLiteStorageDialect) SelectAllSQL ¶
func (d *SQLiteStorageDialect) SelectAllSQL(spec *AdaptedTableSpec) string
func (*SQLiteStorageDialect) SelectSQL ¶
func (d *SQLiteStorageDialect) SelectSQL(spec *AdaptedTableSpec) string
func (*SQLiteStorageDialect) SupportsNativeDecimalAggregation ¶
func (d *SQLiteStorageDialect) SupportsNativeDecimalAggregation() bool
SupportsNativeDecimalAggregation returns false for SQLite. Although SQLite can SUM integers correctly, the scaled representation requires division by the scale factor to produce correct decimal results. Go-side aggregation with shopspring/decimal avoids this complexity and handles AVG correctly.
func (*SQLiteStorageDialect) UpdateSQL ¶
func (d *SQLiteStorageDialect) UpdateSQL(spec *AdaptedTableSpec, versionCheck bool) string
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore implements Store interface using SQLite database.
It maintains two connection pools against the same WAL-mode database:
- db (writer): MaxOpenConns=1, serialises all writes.
- readDB (reader): MaxOpenConns=NumCPU, query_only=ON, parallel reads.
Under WAL mode, readers never block the writer and vice-versa.
func NewSQLiteStore ¶
func NewSQLiteStore(dbPath string, config SQLiteConfig) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite-based storage with separate reader and writer connection pools. Under WAL mode the writer never blocks readers and vice-versa, so splitting pools maximises concurrency.
func (*SQLiteStore) AdaptedColumnInfo ¶
func (s *SQLiteStore) AdaptedColumnInfo(entity, jsonField string) (colName string, scale int, isDecimal bool, ok bool)
AdaptedColumnInfo returns column metadata for a JSON field in an adapted entity.
func (*SQLiteStore) AdaptedRegistry ¶
func (s *SQLiteStore) AdaptedRegistry() *AdaptedRegistry
AdaptedRegistry returns the store's adapted table registry. Returns nil only if the store was not properly initialized.
func (*SQLiteStore) AdaptedTableName ¶
func (s *SQLiteStore) AdaptedTableName(entity string) (string, bool)
AdaptedTableName returns the SQL table name for an adapted entity.
func (*SQLiteStore) AggregateQuery ¶
func (s *SQLiteStore) AggregateQuery(ctx context.Context, sql string, args []interface{}, aliases []string) ([]map[string]interface{}, error)
AggregateQuery executes an aggregate SQL query against native columns and returns results keyed by alias names.
func (*SQLiteStore) Capabilities ¶
func (s *SQLiteStore) Capabilities() QueryCapabilities
Capabilities reports that the SQLite backend can handle WHERE, ORDER BY, LIMIT, and COUNT natively via json_extract() push-down.
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the database connection
func (*SQLiteStore) Commit ¶
func (s *SQLiteStore) Commit(ctx context.Context, req CommitRequest) (CommitResult, error)
Commit performs an atomic upsert + one or more inserts in a single SQLite transaction. The upsert supports optional CAS via Update.Version. All operations share one BEGIN/COMMIT boundary; any failure rolls back the entire set.
func (*SQLiteStore) Config ¶
func (s *SQLiteStore) Config() StoreConfig
Config returns the store's StoreConfig.
func (*SQLiteStore) ContentionLock ¶
func (s *SQLiteStore) ContentionLock() *AdaptiveLock
ContentiontLock returns the store's adaptive lock, allowing runtime configuration of the contention threshold via SetThreshold().
func (*SQLiteStore) CountEntities ¶
CountEntities returns the number of records for an entity type without fetching the data. This is a single indexed COUNT(*) — typically <10µs.
func (*SQLiteStore) Create ¶
func (s *SQLiteStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
Create inserts a new entity with auto-generated ID
func (*SQLiteStore) DB ¶
func (s *SQLiteStore) DB() *sql.DB
DB returns the underlying *sql.DB for advanced operations such as batch seeding or direct SQL execution. Use with care — callers must respect the store's locking and schema conventions.
func (*SQLiteStore) FullTextSearch ¶
func (s *SQLiteStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)
FullTextSearch performs a full-text search across entities
func (*SQLiteStore) Get ¶
func (s *SQLiteStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
Get retrieves an entity by ID
func (*SQLiteStore) GraphTenantIDs ¶
func (s *SQLiteStore) GraphTenantIDs(ctx context.Context) ([]uint16, error)
GraphTenantIDs implements TenantIDLister. It returns all tenant IDs for which a graph_tXXXX edge table should be hydrated at startup. Tenant 0 is always included first (it is implicit and never appears in the tenants registry table). Registered non-zero tenants follow in ascending order.
func (*SQLiteStore) IsAdaptedEntity ¶
func (s *SQLiteStore) IsAdaptedEntity(entity string) bool
IsAdaptedEntity reports whether the entity uses an adapted table.
func (*SQLiteStore) ListEntities ¶
func (s *SQLiteStore) ListEntities(ctx context.Context) ([]string, error)
ListEntities returns all distinct entity types in the database
func (*SQLiteStore) ListPaged ¶
func (s *SQLiteStore) ListPaged(ctx context.Context, entity string, limit, offset int) (*PagedResult, error)
ListPaged returns a single page of entities plus total count, using SQL LIMIT/OFFSET so only the requested page is deserialised.
func (*SQLiteStore) ListWithFields ¶
func (s *SQLiteStore) ListWithFields(ctx context.Context, entity string, fields []string) ([]map[string]interface{}, error)
ListWithFields returns all records for an entity, extracting only the named fields from each JSON blob. For adapted entities this falls through to the regular List path (native columns are already efficient).
func (*SQLiteStore) ListWithFieldsAndFilter ¶
func (s *SQLiteStore) ListWithFieldsAndFilter(ctx context.Context, entity string, fields []string, preds *jsonic.PredicateSet) ([]map[string]interface{}, error)
ListWithFieldsAndFilter returns records for an entity, extracting only the named fields and evaluating predicates inline during tokenisation. Rows that fail the predicates are never materialised as maps. For adapted entities this falls through to the regular path.
func (*SQLiteStore) Patch ¶
func (s *SQLiteStore) Patch(ctx context.Context, entity string, id int, updates map[string]interface{}) error
Patch partially updates an entity
func (*SQLiteStore) PatchValidated ¶
func (s *SQLiteStore) PatchValidated(ctx context.Context, entity string, id int, updates map[string]interface{}, validate func(merged map[string]interface{}) error) error
PatchValidated applies a partial update inside a transaction and runs the validator against the merged data before committing.
func (*SQLiteStore) Ping ¶
func (s *SQLiteStore) Ping(ctx context.Context) error
Ping verifies that the database connection is alive.
func (*SQLiteStore) QueryWithFields ¶
func (s *SQLiteStore) QueryWithFields(ctx context.Context, sqlQuery string, args []interface{}, fields []string) ([]map[string]interface{}, error)
QueryWithFields executes a push-down SQL query and extracts only the named fields from each result's JSON blob.
func (*SQLiteStore) QueryWithPlan ¶
func (s *SQLiteStore) QueryWithPlan(ctx context.Context, sqlQuery string, args []interface{}) ([]map[string]interface{}, error)
QueryWithPlan executes a pre-built SQL query (generated by the OQL planner) and returns the results as maps, in the same format as List().
func (*SQLiteStore) ReaderDB ¶
func (s *SQLiteStore) ReaderDB() *sql.DB
ReaderDB returns the underlying reader connection pool. Used by the tenant persister for read-only queries.
func (*SQLiteStore) RebuildGraph ¶
func (s *SQLiteStore) RebuildGraph(ctx context.Context) error
RebuildGraph rebuilds the tenant edge table from stored entity JSON.
Correctness: uses models.ExtractEntityEdges for REF extraction so that @REFS ([]interface{} of REF maps) and TSREF exclusion are handled identically to the live syncGraphEdges path.
Performance: one PrepareContext call outside the row loop; edges are accumulated and flushed in batches of rebuildBatchSize rather than one ExecContext per edge.
func (*SQLiteStore) RegisterAdaptedEntity ¶
func (s *SQLiteStore) RegisterAdaptedEntity(ctx context.Context, entity string, schema map[string]interface{}) error
RegisterAdaptedEntity derives an adapted table for the given entity type from its JSON Schema and creates the table if it doesn't exist. This is called by the server layer when a schema is loaded or registered.
func (*SQLiteStore) Save ¶
func (s *SQLiteStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)
Save creates an entity with a specific ID (fails if exists)
func (*SQLiteStore) ScanGraphEdges ¶
func (s *SQLiteStore) ScanGraphEdges(ctx context.Context, tenantID uint16, fn func(GraphEdge) error) error
ScanGraphEdges implements GraphEdgeScanner. It streams every row from the tenant-scoped graph_tXXXX edge table, calling fn once per row. Iteration stops on the first non-nil error returned by fn. Rows are read via the reader pool (query_only, parallel-safe). All tenants, including tenant 0, use graph_tXXXX.
func (*SQLiteStore) Search ¶
func (s *SQLiteStore) Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)
Search implements field-based search using JSON extraction
func (*SQLiteStore) StorageDialectFor ¶
func (s *SQLiteStore) StorageDialectFor(entity string) StorageDialect
StorageDialectFor returns the StorageDialect for the given entity, or nil if the entity is not adapted.
func (*SQLiteStore) Update ¶
func (s *SQLiteStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
Update replaces an entity completely
func (*SQLiteStore) VerifyGraphIntegrity ¶
func (s *SQLiteStore) VerifyGraphIntegrity(ctx context.Context) error
VerifyGraphIntegrity checks whether the tenant edge table matches the REF fields in stored entity JSON. Returns a joined error listing every discrepancy found (missing edges + unexpected edges); does not stop at the first violation.
Both reads (entities and edge table) are issued inside a single read transaction so that concurrent writes cannot produce false violations.
Memory: only one map is materialised (expected edges derived from entity JSON). Actual edges from the edge table are streamed and checked against that map rather than accumulated into a second map.
func (*SQLiteStore) WithLogger ¶
func (s *SQLiteStore) WithLogger(logger zerolog.Logger) *SQLiteStore
WithLogger attaches a zerolog.Logger to the store. Returns the store so it can be chained: store := NewSQLiteStore(...).WithLogger(logger). Until this is called the store uses zerolog.Nop() and logs nothing.
type SQLiteTenantPersister ¶
type SQLiteTenantPersister struct {
// contains filtered or unexported fields
}
SQLiteTenantPersister implements tenant.Persister using the tenants table in the shared SQLite database. It uses the writer pool for saves and the reader pool for loads.
func NewSQLiteTenantPersister ¶
func NewSQLiteTenantPersister(db, readDB *sql.DB) *SQLiteTenantPersister
NewSQLiteTenantPersister creates a persister backed by the given database connections. Both must point to the same SQLite database. The writer is used for Save; the reader for LoadAll.
type SchemaBrowser ¶
type SchemaBrowser struct {
// contains filtered or unexported fields
}
SchemaBrowser implements SchemaIntrospector over a queryfy ObjectSchema.
func (*SchemaBrowser) AllowsAdditional ¶
func (b *SchemaBrowser) AllowsAdditional() bool
func (*SchemaBrowser) FieldNames ¶
func (b *SchemaBrowser) FieldNames() []string
func (*SchemaBrowser) GetField ¶
func (b *SchemaBrowser) GetField(name string) FieldIntrospector
func (*SchemaBrowser) IsRequired ¶
func (b *SchemaBrowser) IsRequired(name string) bool
type SchemaDiff ¶
type SchemaDiff struct {
Added []ColumnDef // Columns in new but not in old
Dropped []ColumnDef // Columns in old but not in new
Changed []ColumnChange // Columns present in both but with incompatible type changes
IndexesAdded []IndexDef // Indexes in new but not in old
IndexesDropped []IndexDef // Indexes in old but not in new
HasExtraChanged bool // Whether _extra presence changed
NewHasExtra bool // The new HasExtra value (only meaningful if HasExtraChanged)
}
SchemaDiff describes the differences between an old and new AdaptedTableSpec. It is the migration plan for schema evolution.
func DiffAdaptedSpecs ¶
func DiffAdaptedSpecs(old, new *AdaptedTableSpec) *SchemaDiff
DiffAdaptedSpecs compares two AdaptedTableSpecs and produces a migration plan. The old spec represents the currently deployed table; the new spec represents the desired state from the updated schema.
func (*SchemaDiff) HasTypeConflicts ¶
func (d *SchemaDiff) HasTypeConflicts() bool
HasTypeConflicts reports whether there are incompatible type changes that prevent automatic migration.
func (*SchemaDiff) IsEmpty ¶
func (d *SchemaDiff) IsEmpty() bool
IsEmpty reports whether the diff contains no changes.
type SchemaIntrospector ¶
type SchemaIntrospector interface {
// FieldNames returns all declared field names, sorted alphabetically.
FieldNames() []string
// GetField returns the field descriptor for a named field.
// Returns nil if the field does not exist.
GetField(name string) FieldIntrospector
// IsRequired reports whether a field is required.
IsRequired(name string) bool
// AllowsAdditional reports whether the schema accepts fields not
// declared in its field list. Returns true if additionalProperties
// is absent or true; false if explicitly set to false.
AllowsAdditional() bool
}
SchemaIntrospector provides read access to an object schema's structure. This is the primary interface consumed by DeriveAdaptedTableSpec.
func NewJSONSchemaIntrospector ¶
func NewJSONSchemaIntrospector(schema map[string]interface{}) SchemaIntrospector
NewJSONSchemaIntrospector wraps a parsed JSON Schema document. Returns nil if the schema has no "properties" key.
func NewSchemaBrowser ¶
func NewSchemaBrowser(obj *builders.ObjectSchema) SchemaIntrospector
NewSchemaBrowser wraps a queryfy ObjectSchema for introspection. Returns nil if obj is nil.
type Searcher ¶
type Searcher interface {
Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)
}
Searcher defines optional search capabilities
type StmtCache ¶
type StmtCache struct {
// contains filtered or unexported fields
}
StmtCache is a concurrency-safe LRU cache of prepared statements.
The cache is keyed by the SQL string. Since the OQL planner produces parameterised queries with ? placeholders, two queries with different parameter values but the same shape share a single prepared statement.
Lifecycle:
- Created during SQLiteStore initialisation.
- Get() returns a cached *sql.Stmt or prepares and caches a new one.
- Close() closes all cached statements (called from SQLiteStore.Close).
- Invalidate() removes a single entry (for schema evolution).
- Reset() closes and removes all entries.
func NewStmtCache ¶
NewStmtCache creates a statement cache that prepares against db. Pass maxSize=0 to use DefaultStmtCacheSize.
func (*StmtCache) Close ¶
func (c *StmtCache) Close()
Close closes all cached statements. After Close, the cache must not be used. Called from SQLiteStore.Close().
func (*StmtCache) Get ¶
Get returns a prepared statement for the given SQL. If the statement is not cached, it is prepared and added to the cache, evicting the least-recently-used entry if the cache is full.
The returned *sql.Stmt must NOT be closed by the caller — the cache owns the statement's lifecycle.
func (*StmtCache) Invalidate ¶
Invalidate removes and closes the prepared statement for the given SQL, if cached. Used when a schema change invalidates a statement (e.g. adapted table ALTER TABLE).
type StorageDialect ¶
type StorageDialect interface {
// Name returns the dialect identifier ("sqlite", "postgres", etc.).
Name() string
// Placeholder returns a parameter placeholder for the n-th argument
// (1-based). SQLite: "?", PostgreSQL: "$1", "$2", etc.
Placeholder(n int) string
// ColumnType maps a JSON Schema type + format to the backend's native
// SQL column type. Examples:
// ("string", "") → "TEXT"
// ("integer", "") → "INTEGER" (SQLite) / "BIGINT" (PostgreSQL)
// ("number", "") → "REAL" (SQLite) / "DOUBLE PRECISION" (PostgreSQL)
// ("number", "decimal") → "TEXT" (SQLite) / "NUMERIC(p,s)" (PostgreSQL)
// ("boolean", "") → "INTEGER" (SQLite) / "BOOLEAN" (PostgreSQL)
// ("array", "") → "TEXT" (SQLite) / "JSONB" (PostgreSQL)
// ("object", "") → "TEXT" (SQLite) / "JSONB" (PostgreSQL)
//
// For decimals, precision and scale are provided for backends that
// support native fixed-point (PostgreSQL NUMERIC). Backends that store
// decimals as text (SQLite) may ignore them.
ColumnType(jsonType, format string, precision, scale int) string
// CreateTableSQL generates the CREATE TABLE statement for an adapted
// table. Implementations must include system columns (id, tenant_id,
// _extra if hasExtra, _version, created_at, updated_at) and the
// primary key.
CreateTableSQL(spec *AdaptedTableSpec) string
// CreateIndexSQL generates CREATE INDEX statements for the adapted
// table's indexes.
CreateIndexSQL(spec *AdaptedTableSpec) []string
// InsertSQL generates an INSERT statement with the appropriate
// placeholders. Returns the SQL string and the expected argument
// order (column names). The caller provides the actual values.
InsertSQL(spec *AdaptedTableSpec, hasExtra bool) (sql string, columns []string)
// SelectSQL generates a SELECT statement for a single row by
// tenant_id and id.
SelectSQL(spec *AdaptedTableSpec) string
// SelectAllSQL generates a SELECT statement for all rows in a tenant,
// ordered by id.
SelectAllSQL(spec *AdaptedTableSpec) string
// UpdateSQL generates an UPDATE statement with the appropriate
// placeholders. The versionCheck parameter controls whether a
// _version = ? clause is appended to the WHERE.
UpdateSQL(spec *AdaptedTableSpec, versionCheck bool) string
// DeleteSQL generates a DELETE statement for a single row.
DeleteSQL(spec *AdaptedTableSpec) string
// ExistsSQL generates an EXISTS check for a single row.
ExistsSQL(spec *AdaptedTableSpec) string
// MetadataTableSQL generates the DDL for the adapted_table_schemas
// metadata table.
MetadataTableSQL() string
// NormaliseDecimal transforms a validated decimal string into the
// storage representation for this backend. SQLite scales to int64.
// PostgreSQL returns the value unchanged.
NormaliseDecimal(value string, precision, scale int) (string, error)
// DenormaliseDecimal transforms the stored representation back to
// a client-facing string. SQLite divides by 10^scale and formats.
// PostgreSQL returns the value unchanged.
DenormaliseDecimal(value string, precision, scale int) string
// SupportsNativeDecimalAggregation reports whether the backend
// handles SUM/AVG on decimal columns with exact arithmetic.
// If false, OQL aggregates in Go using shopspring/decimal.
SupportsNativeDecimalAggregation() bool
}
StorageDialect defines backend-specific SQL generation for adapted tables.
type Store ¶
type Store interface {
// Config returns the store's configuration.
Config() StoreConfig
// Entity CRUD operations
Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
Patch(ctx context.Context, entity string, id int, data map[string]interface{}) error
// PatchValidated is like Patch but runs a validation function against the
// merged data inside the transaction. If the validator returns an error,
// the transaction is rolled back and the error is returned to the caller.
// This avoids TOCTOU races where a Get-merge-Update sequence can observe
// stale data between the Get and the Update.
PatchValidated(ctx context.Context, entity string, id int, data map[string]interface{}, validate func(merged map[string]interface{}) error) error
Delete(ctx context.Context, entity string, id int) error
// Save upserts an entity with the caller-specified ID: creates it if it
// does not exist, overwrites it if it does. Returns (true, nil) when a
// new record was created and (false, nil) when an existing record was
// replaced. Never returns an error solely because the ID already exists.
Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)
// Commit performs an atomic upsert + one or more inserts in a single
// storage transaction. The upsert (req.Update) supports optional
// optimistic concurrency via Version. Each entry in req.Append is an
// unconditional insert; a duplicate explicit ID returns ErrAlreadyExists
// and rolls back the entire commit. Returns ErrConflict when the Update
// version check fails.
Commit(ctx context.Context, req CommitRequest) (CommitResult, error)
// Query operations
List(ctx context.Context, entity string) ([]map[string]interface{}, error)
Exists(ctx context.Context, entity string, id int) bool
Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)
// Full-text search (optional - may return empty if not supported)
FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)
// Ping verifies that the storage backend is reachable. Returns nil on
// success. Used by health and readiness probes.
Ping(ctx context.Context) error
// Lifecycle
Close() error
}
Store defines the core interface for entity storage backends
func NewStoreFromConfig ¶
func NewStoreFromConfig(cfg StoreConfig) (Store, error)
NewStoreFromConfig creates a store directly from a StoreConfig. This is the preferred constructor for tenant-scoped stores.
type StoreConfig ¶
type StoreConfig struct {
Type string // "sqlite", "jsonfile"
DBPath string // SQLite database file path
BaseDir string // JSONFile base directory
Schema string // JSONFile schema subdirectory
FullTextEnabled bool // controls FTS indexing in backend
GraphEnabled bool // controls graph edge table maintenance
TenantID uint16 // 0 = no tenant scoping
// Performance tuning (SQLite-specific; zero = use defaults)
SQLiteCacheSize int // Page cache size in KB
SQLiteBusyTimeout int // Milliseconds to wait on locked database
SQLiteMaxOpenConns int // Max open database connections
SQLiteMaxIdleConns int // Max idle database connections
SQLiteReadPoolSize int // Max open read connections (0 = auto)
SQLiteContentionThreshold int // Adaptive lock threshold 0-100
}
StoreConfig is the canonical configuration for all store backends. A store is constructed with a StoreConfig and scoped to that config for its entire lifetime. TenantID 0 means no tenant scoping.
type StoreFactory ¶
StoreFactory is a function that creates a new Store instance
type StoreInfo ¶
type StoreInfo struct {
Type string // "jsonfile", "sqlite", "postgres", etc.
Version string
SupportsSearch bool
SupportsBatch bool
SupportsTransaction bool
}
StoreInfo provides metadata about the store implementation
type TenantIDLister ¶
TenantIDLister is an optional interface for storage backends that can enumerate all tenant IDs for which a graph_tXXXX edge table exists. The returned slice must always include tenant 0 (the implicit default). Used during startup graph hydration to restore graph state for all tenants.
Backends that do not implement this interface fall back to scanning only tenant 0 via a direct ScanGraphEdges call.
Source Files
¶
- adapted.go
- adapted_crud.go
- adapted_decimal.go
- adaptive_lock.go
- aggregate_queryable.go
- dialect.go
- dialect_sqlite.go
- factory.go
- field_queryable.go
- jsonfile.go
- queryable.go
- schema_browser.go
- schema_evolution.go
- schema_introspect.go
- schema_json_adapter.go
- sqlite.go
- sqlite_aggregate.go
- sqlite_field_query.go
- stmt_cache.go
- storage.go
- tenant_persist.go