storage

package
v0.9.7-patched74 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultStmtCacheSize = 256

DefaultStmtCacheSize is the maximum number of prepared statements kept in the cache before LRU eviction. Each entry holds a *sql.Stmt against the reader pool. For a typical olu workload the number of distinct SQL shapes is bounded (tens, not thousands), so 256 is generous while keeping memory bounded.

Variables

View Source
var (
	// ErrNotFound is returned when an entity is not found
	ErrNotFound = errors.New("entity not found")
	// ErrAlreadyExists is returned when an entity already exists
	ErrAlreadyExists = errors.New("entity already exists")
	// ErrInvalidEntity is returned when entity name is invalid
	ErrInvalidEntity = errors.New("invalid entity name")
	// ErrInvalidID is returned when ID is invalid
	ErrInvalidID = errors.New("invalid ID")
	// ErrConflict is returned when an optimistic concurrency check fails
	ErrConflict = errors.New("version conflict")
	// ErrNotSupported is returned when a storage backend does not implement
	// a particular operation. Handlers that receive this error should map it
	// to an appropriate HTTP 501 Not Implemented response.
	ErrNotSupported = errors.New("operation not supported by this storage backend")
)

Functions

func DenormaliseDecimalColumns

func DenormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{})

DenormaliseDecimalColumns applies dialect-specific decimal denormalisation to column values read from the database. This converts scaled integers back to client-facing decimal strings. Called on the read path (get, list) after SQL scan.

func GenerateAdaptedSchemasTableSQL

func GenerateAdaptedSchemasTableSQL(dialect StorageDialect) string

GenerateAdaptedSchemasTableSQL returns the DDL for the metadata table that tracks adapted table schemas, using the given dialect.

func GenerateCreateTableSQL

func GenerateCreateTableSQL(spec *AdaptedTableSpec, dialect StorageDialect) string

GenerateCreateTableSQL produces the CREATE TABLE statement for an adapted table using the given dialect.

func GenerateIndexSQL

func GenerateIndexSQL(spec *AdaptedTableSpec, dialect StorageDialect) []string

GenerateIndexSQL produces CREATE INDEX statements for the adapted table using the given dialect.

func ListStores

func ListStores() []string

ListStores returns all registered store types

func MigrateAdaptedTable

func MigrateAdaptedTable(
	ctx context.Context,
	db *sql.DB,
	registry *AdaptedRegistry,
	entity string,
	newSchema map[string]interface{},
	dialect StorageDialect,
) error

MigrateAdaptedTable applies schema changes to an existing adapted table. It computes the diff between the stored spec and the new schema, then:

  1. Rejects type changes (incompatible, require manual intervention)
  2. Adds new columns via ALTER TABLE ADD COLUMN
  3. Drops removed columns via ALTER TABLE DROP COLUMN (SQLite 3.35+), after migrating any existing data to the _extra overflow column
  4. Updates indexes (drop old, create new)
  5. Updates the metadata row in adapted_table_schemas
  6. Updates the in-memory registry

The entire migration runs in a single transaction.

func NormaliseDecimalColumns

func NormaliseDecimalColumns(spec *AdaptedTableSpec, dialect StorageDialect, colVals []interface{}) error

NormaliseDecimalColumns applies dialect-specific decimal normalisation to column values produced by PartitionData. This transforms validated decimal strings into scaled integers for storage. Called on the write path (create, update) before SQL execution.

func PartitionData

func PartitionData(spec *AdaptedTableSpec, data map[string]interface{}) (columnValues []interface{}, extra map[string]interface{})

PartitionData separates a data map into schema-column values and overflow. Returns:

  • columnValues: ordered values matching spec.Columns, ready for INSERT
  • extra: map of fields not in the schema (nil if none or !hasExtra)

REF fields are decomposed: {"type":"REF","entity":"users","id":42} becomes two column values: "users" (for REF_{field}_entity) and 42 (for REF_{field}_id).

func ReassembleData

func ReassembleData(spec *AdaptedTableSpec, columnValues []interface{}, extra map[string]interface{}, id int, version int) map[string]interface{}

ReassembleData reconstructs a map[string]interface{} from column values and an optional overflow map. This is the inverse of PartitionData.

func RegisterAdaptedTable

func RegisterAdaptedTable(ctx context.Context, db *sql.DB, registry *AdaptedRegistry, entity string, schema map[string]interface{}, dialect StorageDialect) error

RegisterAdaptedTable derives a table spec from a JSON Schema, creates the table and indexes in the database, and records the spec in the adapted_table_schemas metadata table.

If the table already exists with the same schema hash, this is a no-op. If the schema has changed, the caller must handle migration separately (Phase 4 of the design).

func RegisterStore

func RegisterStore(name string, factory StoreFactory)

RegisterStore registers a new store implementation

Types

type AdaptedRegistry

type AdaptedRegistry struct {
	// contains filtered or unexported fields
}

AdaptedRegistry tracks which entity types have adapted tables.

func LoadAdaptedRegistry

func LoadAdaptedRegistry(ctx context.Context, db *sql.DB) (*AdaptedRegistry, error)

LoadAdaptedRegistry reads the adapted_table_schemas metadata table and populates the registry. Called at store startup.

func NewAdaptedRegistry

func NewAdaptedRegistry() *AdaptedRegistry

NewAdaptedRegistry creates an empty registry.

func (*AdaptedRegistry) Entities

func (r *AdaptedRegistry) Entities() []string

Entities returns a sorted list of all adapted entity types.

func (*AdaptedRegistry) Get

func (r *AdaptedRegistry) Get(entity string) *AdaptedTableSpec

Get returns the adapted table spec for an entity, or nil if the entity uses blob storage.

func (*AdaptedRegistry) IsAdapted

func (r *AdaptedRegistry) IsAdapted(entity string) bool

IsAdapted reports whether an entity type has an adapted table.

func (*AdaptedRegistry) Set

func (r *AdaptedRegistry) Set(entity string, spec *AdaptedTableSpec)

Set registers an adapted table spec for an entity.

type AdaptedTableSpec

type AdaptedTableSpec struct {
	Entity     string      `json:"entity"`      // Entity type name
	Columns    []ColumnDef `json:"columns"`     // Ordered column definitions
	SchemaHash string      `json:"schema_hash"` // SHA-256 of canonical schema JSON
	HasExtra   bool        `json:"has_extra"`   // Whether _extra overflow column is present
	Indexes    []IndexDef  `json:"indexes"`     // Indexes to create
}

AdaptedTableSpec describes the full column layout of an adapted table.

func DeriveAdaptedTableSpec

func DeriveAdaptedTableSpec(entity string, schema map[string]interface{}, dialect StorageDialect) (*AdaptedTableSpec, error)

DeriveAdaptedTableSpec examines a JSON Schema document and produces a complete AdaptedTableSpec describing the adapted table layout.

The dialect parameter determines backend-specific column types.

This is a convenience wrapper that creates a SchemaIntrospector from the raw JSON Schema map. For direct use with queryfy (future), call DeriveAdaptedTableSpecFrom with a queryfy-backed introspector.

func DeriveAdaptedTableSpecFrom

func DeriveAdaptedTableSpecFrom(entity string, schema SchemaIntrospector, dialect StorageDialect, schemaHash string) (*AdaptedTableSpec, error)

DeriveAdaptedTableSpecFrom derives an AdaptedTableSpec from a SchemaIntrospector. This is the backend-agnostic core that works with any schema representation (JSON Schema maps, queryfy objects, or anything else that implements SchemaIntrospector).

func (*AdaptedTableSpec) ColumnByName

func (s *AdaptedTableSpec) ColumnByName(name string) (ColumnDef, bool)

ColumnByName returns the ColumnDef for a given SQL column name.

func (*AdaptedTableSpec) ColumnNames

func (s *AdaptedTableSpec) ColumnNames() []string

ColumnNames returns all column names in order (excluding system columns).

func (*AdaptedTableSpec) FieldToColumn

func (s *AdaptedTableSpec) FieldToColumn(jsonField string) []string

FieldToColumn maps a JSON field name to its column name(s). For REF fields, this returns two names: REF_{field}_entity, REF_{field}_id. For all other fields, it returns a single name equal to the field name.

func (*AdaptedTableSpec) IsSchemaField

func (s *AdaptedTableSpec) IsSchemaField(jsonField string) bool

IsSchemaField reports whether a JSON field name is a declared schema field.

func (*AdaptedTableSpec) TableName

func (s *AdaptedTableSpec) TableName() string

TableName returns the SQL table name for this adapted table.

type AdaptiveLock

type AdaptiveLock struct {
	// contains filtered or unexported fields
}

AdaptiveLock provides a mutex that automatically engages under contention.

Under normal load, operations proceed without any Go-side locking, relying on SQLite's WAL mode and busy_timeout for concurrency control. When the write success rate drops below a configurable threshold (indicating heavy contention), the lock engages a sync.RWMutex to serialise writes and eliminate SQLITE_BUSY errors entirely. When contention subsides, the lock disengages and returns to lock-free operation.

This gives the best of both worlds: full WAL concurrency under normal load, and guaranteed zero errors under burst load.

func NewAdaptiveLock

func NewAdaptiveLock(threshold int) *AdaptiveLock

NewAdaptiveLock creates an adaptive lock with the given success-rate threshold. threshold is expressed as a percentage (e.g. 95 means engage the mutex when the success rate drops below 95%). Valid range: 0-100. 0 disables the lock entirely; 100 keeps it permanently engaged (equivalent to a plain mutex).

func (*AdaptiveLock) Engaged

func (al *AdaptiveLock) Engaged() bool

Engaged returns whether the mutex is currently engaged.

func (*AdaptiveLock) Lock

func (al *AdaptiveLock) Lock() bool

Lock acquires a write lock if the adaptive lock is engaged. Returns true if the lock was acquired (caller must call Unlock).

func (*AdaptiveLock) RLock

func (al *AdaptiveLock) RLock() bool

RLock acquires a read lock if the adaptive lock is engaged. Returns true if the lock was acquired (caller must call RUnlock).

func (*AdaptiveLock) RUnlock

func (al *AdaptiveLock) RUnlock()

RUnlock releases the read lock. Only call if RLock returned true.

func (*AdaptiveLock) RecordFailure

func (al *AdaptiveLock) RecordFailure()

RecordFailure records a failed (SQLITE_BUSY) operation. If the threshold is non-zero, this immediately engages the lock — the burst is happening now and waiting for the monitor tick would let more failures through. The monitor goroutine handles disengagement once the window is clean.

func (*AdaptiveLock) RecordSuccess

func (al *AdaptiveLock) RecordSuccess()

RecordSuccess records a successful operation.

func (*AdaptiveLock) SetThreshold

func (al *AdaptiveLock) SetThreshold(threshold int)

SetThreshold dynamically updates the success-rate threshold at runtime. threshold is a percentage (0-100). Takes effect on the next monitor tick.

func (*AdaptiveLock) Stop

func (al *AdaptiveLock) Stop()

Stop terminates the background monitor goroutine.

func (*AdaptiveLock) Threshold

func (al *AdaptiveLock) Threshold() int

Threshold returns the current threshold as a percentage (0-100).

func (*AdaptiveLock) Unlock

func (al *AdaptiveLock) Unlock()

Unlock releases the write lock. Only call if Lock returned true.

type AggregateQueryable

type AggregateQueryable interface {
	// AggregateQuery executes a pre-built aggregate SQL query and returns
	// the grouped results as maps. Unlike QueryWithPlan (which scans
	// data+_version blobs), this scans arbitrary columns/expressions
	// and returns them by alias.
	AggregateQuery(ctx context.Context, sql string, args []interface{}, aliases []string) ([]map[string]interface{}, error)

	// IsAdaptedEntity reports whether the given entity uses an adapted
	// table (column-per-field storage). The OQL planner uses this to
	// decide whether aggregate push-down is possible.
	IsAdaptedEntity(entity string) bool

	// AdaptedColumnInfo returns the SQL column name for a JSON field in
	// an adapted entity. Returns ("", false) if the entity is not adapted
	// or the field is not a known column. For decimal columns, also returns
	// the scale so the caller can denormalise aggregated values.
	AdaptedColumnInfo(entity, jsonField string) (colName string, scale int, isDecimal bool, ok bool)

	// AdaptedTableName returns the SQL table name for an adapted entity.
	// Returns ("", false) if the entity is not adapted.
	AdaptedTableName(entity string) (string, bool)

	// StorageDialectFor returns the StorageDialect for the given entity,
	// or nil if the entity is not adapted. This allows SQL generators to
	// access dialect methods without importing backend-specific types.
	StorageDialectFor(entity string) StorageDialect
}

AggregateQueryable is an optional interface for storage backends that support native GROUP BY + aggregate push-down for adapted tables.

When an entity has an adapted table, aggregate queries can run entirely in SQL against native columns instead of fetching all rows into Go. The OQL executor checks for this interface via type assertion.

type Batcher

type Batcher interface {
	BatchCreate(ctx context.Context, entity string, items []map[string]interface{}) ([]int, error)
	BatchDelete(ctx context.Context, entity string, ids []int) error
}

Batcher defines optional batch operation support

type ColumnChange

type ColumnChange struct {
	Name       string
	OldSQLType string
	NewSQLType string
	OldType    string
	NewType    string
}

ColumnChange records an incompatible type change for a column.

type ColumnDef

type ColumnDef struct {
	Name      string `json:"name"`       // Column name (e.g., "age", "REF_author_entity")
	JSONField string `json:"json_field"` // Original JSON field name (e.g., "age", "author")
	Type      string `json:"type"`       // JSON Schema type: string, integer, number, boolean, array, object
	Format    string `json:"format"`     // JSON Schema format: "", "decimal", "ref", "email", etc.
	SQLType   string `json:"sql_type"`   // Backend-specific SQL type (e.g., "TEXT", "INTEGER", "REAL")
	Required  bool   `json:"required"`   // Whether the field is in the schema's required array
	Precision int    `json:"precision"`  // For decimal: total significant digits
	Scale     int    `json:"scale"`      // For decimal: digits after decimal point
	IsREF     bool   `json:"is_ref"`     // True if this column is part of a REF decomposition
}

ColumnDef describes a single column in an adapted table.

type CommitAppend

type CommitAppend struct {
	Entity string                 `json:"entity"`
	ID     *int                   `json:"id,omitempty"`
	Data   map[string]interface{} `json:"data"`
}

CommitAppend describes one record to insert in a Commit operation. If ID is nil, the backend auto-generates an ID. If ID is non-nil and a record with that ID already exists in the entity type, ErrAlreadyExists is returned and the entire commit is rolled back.

type CommitAppendResult

type CommitAppendResult struct {
	Entity string `json:"entity"`
	ID     int    `json:"id"`
}

CommitAppendResult describes one inserted record in a Commit response. ID is always set; for auto-generated IDs it contains the assigned value.

type CommitRequest

type CommitRequest struct {
	Update CommitUpdate   `json:"update"`
	Append []CommitAppend `json:"append"`
}

CommitRequest is the payload for the atomic commit endpoint. It performs one conditional upsert (Update) and one or more unconditional inserts (Append) in a single storage transaction.

type CommitResult

type CommitResult struct {
	Update   CommitUpdateResult   `json:"update"`
	Appended []CommitAppendResult `json:"appended"`
}

CommitResult is returned on a successful Commit.

type CommitUpdate

type CommitUpdate struct {
	Entity  string                 `json:"entity"`
	ID      int                    `json:"id"`
	Version *int                   `json:"version,omitempty"`
	Data    map[string]interface{} `json:"data"`
}

CommitUpdate describes the entity to upsert in a Commit operation. If Version is non-nil, the write is conditional: it proceeds only if the stored _version equals *Version. A mismatch returns ErrConflict.

type CommitUpdateResult

type CommitUpdateResult struct {
	Entity  string `json:"entity"`
	ID      int    `json:"id"`
	Created bool   `json:"created"`
	Version int    `json:"version"`
}

CommitUpdateResult describes the outcome of the upsert in a Commit. Created is true when a new record was inserted; false when an existing record was overwritten. Version is the _version value after the commit.

type EntityLister

type EntityLister interface {
	ListEntities(ctx context.Context) ([]string, error)
}

EntityLister defines optional entity type listing support

type FieldIntrospector

type FieldIntrospector interface {
	// JSONType returns the JSON Schema type string:
	// "string", "integer", "number", "boolean", "array", "object"
	JSONType() string

	// Format returns the declared format ("email", "decimal", "ref", "")
	Format() string

	// EnumValues returns the declared enum values, or nil if none.
	EnumValues() []string

	// Meta returns a metadata value by key (e.g., "decimalPrecision").
	// Returns (nil, false) if the key is not set.
	Meta(key string) (interface{}, bool)
}

FieldIntrospector provides read access to a single field's type and constraints. This maps to queryfy's per-type schema introspection (StringSchema.FormatType, NumberSchema.RangeConstraints, etc.).

type FieldQueryable

type FieldQueryable interface {
	// ListWithFields returns all records for an entity type, but each
	// record contains only the specified fields plus _version. Fields
	// not present in a particular record's JSON are omitted from the
	// returned map (no null padding).
	//
	// For adapted entities this falls through to the regular List path
	// (adapted tables already select native columns efficiently).
	ListWithFields(ctx context.Context, entity string, fields []string) ([]map[string]interface{}, error)

	// QueryWithFields executes a pre-built SQL query (WHERE push-down)
	// and returns results with only the specified fields extracted from
	// the data blob. Like QueryWithPlan but avoids full deserialisation.
	QueryWithFields(ctx context.Context, sqlQuery string, args []interface{}, fields []string) ([]map[string]interface{}, error)
}

FieldQueryable is an optional interface for storage backends that support selective field extraction from blob entities.

Instead of deserialising every JSON blob into a full map, a FieldQueryable backend can extract only the requested fields during the scan loop. For blob entities this avoids allocating maps with dozens of unused keys.

The OQL executor checks for this interface via type assertion. If the store satisfies FieldQueryable and the query's SELECT list names specific fields (not SELECT *), the executor may call ListWithFields instead of List.

type FilterableStore

type FilterableStore interface {
	FieldQueryable

	// ListWithFieldsAndFilter returns records for an entity type,
	// extracting only the specified fields and applying the predicate
	// set during tokenisation. Rows that fail the predicates are never
	// materialised as maps.
	//
	// The caller must ensure that fields includes all columns needed
	// for the SELECT list. Predicate fields may or may not overlap
	// with output fields.
	ListWithFieldsAndFilter(ctx context.Context, entity string, fields []string, preds *jsonic.PredicateSet) ([]map[string]interface{}, error)
}

FilterableStore is an optional extension of FieldQueryable that supports predicate evaluation during JSON tokenisation (B4 push-down).

Instead of extracting all rows and filtering in Go afterward, a FilterableStore evaluates simple predicates inline during the token walk, skipping map allocation for rows that don't match.

The OQL executor checks for this interface via type assertion. If the store satisfies FilterableStore and the WHERE clause can be expressed as a jsonic.PredicateSet, the executor passes predicates down.

type GraphEdge

type GraphEdge struct {
	SourceEntity string
	SourceID     int
	TargetEntity string
	TargetID     int
	Relationship string
}

GraphEdge holds the five columns of one row from the graph edges table.

type GraphEdgeScanner

type GraphEdgeScanner interface {
	ScanGraphEdges(ctx context.Context, tenantID uint16, fn func(GraphEdge) error) error
}

GraphEdgeScanner is an optional interface for storage backends that can stream graph edges directly from their edge table without deserialising full entity JSON. Implementing this interface enables O(edges) startup graph hydration instead of O(entities × JSON size).

ScanGraphEdges calls fn once per edge row. Iteration stops on the first non-nil error returned by fn. A nil error from ScanGraphEdges means all rows were scanned (or fn stopped iteration early with a sentinel — callers must define their own sentinel if needed).

tenantID scopes the scan to a specific tenant's edge table. Pass 0 for the default (tenant-0) table. Future SQL backends may extend this to scan all tenant tables in a single call; the current SQLite implementation scans one tenant at a time, matching the existing startup scope.

type GraphIntegrity

type GraphIntegrity interface {
	VerifyGraphIntegrity(ctx context.Context) error
	RebuildGraph(ctx context.Context) error
}

GraphIntegrity defines optional graph integrity checking

type GraphNeighbors

type GraphNeighbors interface {
}

GraphNeighbors defines optional graph neighbor queries

type IDGenerator

type IDGenerator interface {
	NextID(ctx context.Context, entity string) (int, error)
}

IDGenerator defines interface for ID generation strategies

type IndexDef

type IndexDef struct {
	Name    string   `json:"name"`    // Index name
	Columns []string `json:"columns"` // Column names
	Unique  bool     `json:"unique"`  // Whether the index is unique
}

IndexDef describes an index on an adapted table.

type InfoProvider

type InfoProvider interface {
	Info() StoreInfo
}

InfoProvider allows stores to provide metadata about their capabilities

type JSONFileStore

type JSONFileStore struct {
	// contains filtered or unexported fields
}

JSONFileStore implements Store interface using JSON files

func NewJSONFileStore

func NewJSONFileStore(baseDir, schema string) (*JSONFileStore, error)

NewJSONFileStore creates a new JSON file-based storage

func (*JSONFileStore) Close

func (s *JSONFileStore) Close() error

Close closes the storage (cleanup if needed)

func (*JSONFileStore) Commit

Commit performs an atomic-as-possible upsert + append sequence on the jsonfile backend. True atomicity is not available in the file system; this implementation acquires per-entity ID locks in sorted order to prevent races and rolls back completed writes on failure.

Commit is not supported by the jsonfile backend. The jsonfile backend does not provide true transactional atomicity and has been deprecated for production use. This method exists solely to satisfy the Store interface; callers will receive ErrNotSupported, which the HTTP handler maps to 501 Not Implemented (OLU-CM009).

func (*JSONFileStore) Config

func (s *JSONFileStore) Config() StoreConfig

Config returns the store's StoreConfig.

func (*JSONFileStore) Create

func (s *JSONFileStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)

Create creates a new entity with auto-generated ID

func (*JSONFileStore) Delete

func (s *JSONFileStore) Delete(ctx context.Context, entity string, id int) error

Delete removes an entity

func (*JSONFileStore) Exists

func (s *JSONFileStore) Exists(ctx context.Context, entity string, id int) bool

Exists checks if an entity exists

func (*JSONFileStore) FullTextSearch

func (s *JSONFileStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)

FullTextSearch is not supported for JSONFileStore, returns empty results

func (*JSONFileStore) Get

func (s *JSONFileStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)

Get retrieves an entity by ID

func (*JSONFileStore) GetEntityDir

func (s *JSONFileStore) GetEntityDir(entity string) string

GetEntityDir returns the directory path for an entity

func (*JSONFileStore) Info

func (s *JSONFileStore) Info() StoreInfo

Info returns store information

func (*JSONFileStore) List

func (s *JSONFileStore) List(ctx context.Context, entity string) ([]map[string]interface{}, error)

List returns all entities of a given type

func (*JSONFileStore) ListEntities

func (s *JSONFileStore) ListEntities(ctx context.Context) ([]string, error)

ListEntities returns all entity types in the schema

func (*JSONFileStore) NextID

func (s *JSONFileStore) NextID(ctx context.Context, entity string) (int, error)

NextID gets the next available ID for an entity

func (*JSONFileStore) Patch

func (s *JSONFileStore) Patch(ctx context.Context, entity string, id int, patchData map[string]interface{}) error

Patch partially updates an entity

func (*JSONFileStore) PatchValidated

func (s *JSONFileStore) PatchValidated(ctx context.Context, entity string, id int, patchData map[string]interface{}, validate func(merged map[string]interface{}) error) error

PatchValidated merges patch data and optionally validates the merged result. Note: JSONFileStore does not provide transactional isolation for Patch — concurrent patches may still race. For production multi-tenant use, use SQLite.

func (*JSONFileStore) Ping

func (s *JSONFileStore) Ping(ctx context.Context) error

Ping verifies that the storage directory is accessible.

func (*JSONFileStore) Save

func (s *JSONFileStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)

Save upserts an entity with the caller-specified ID. Returns (true, nil) when a new file was created, (false, nil) when an existing file was overwritten.

Optimistic concurrency: if data contains "_version", the overwrite path is conditional. If the stored _version differs from the expected value, Save returns ErrConflict without writing. On a successful overwrite, _version is incremented. The create path always starts at _version = 1.

func (*JSONFileStore) Search

func (s *JSONFileStore) Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)

Search implements field-based search

func (*JSONFileStore) Update

func (s *JSONFileStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error

Update replaces an entity completely. Optimistic concurrency: if data contains "_version", the write is conditional on the stored version matching. Returns ErrConflict on mismatch, ErrNotFound if the entity does not exist.

type Migrator

type Migrator interface {
	Migrate(ctx context.Context) error
	Version(ctx context.Context) (int, error)
}

Migrator defines optional schema migration support Useful for database backends

type PagedLister

type PagedLister interface {
	// ListPaged returns a single page of entities, plus the total count.
	// limit and offset are applied at the storage layer (SQL LIMIT/OFFSET).
	ListPaged(ctx context.Context, entity string, limit, offset int) (*PagedResult, error)
}

PagedLister is an optional interface for storage backends that support server-side pagination. Backends that implement this avoid loading every record into memory for paginated list requests.

type PagedResult

type PagedResult struct {
	Data       []map[string]interface{}
	TotalItems int
}

PagedResult holds a page of results plus the total count.

type QueryCapabilities

type QueryCapabilities struct {
	Where   bool // Can filter with json_extract predicates
	OrderBy bool // Can sort by json_extract fields
	Limit   bool // Can apply TOP/LIMIT
	Count   bool // Can return entity count without full scan
}

QueryCapabilities reports what a storage backend can handle natively via predicate push-down. Used by the OQL planner to decide which operations to delegate to the storage engine versus executing in Go.

type Queryable

type Queryable interface {
	// Capabilities reports which query operations this backend handles
	// natively. The planner will not attempt to push down an operation
	// unless the corresponding capability is true.
	Capabilities() QueryCapabilities

	// CountEntities returns the number of records for an entity type
	// without fetching the records themselves. Used by the planner to
	// determine whether push-down is worthwhile (the fixed overhead of
	// a push-down query exceeds Go-side cost for small datasets).
	CountEntities(ctx context.Context, entity string) (int, error)

	// QueryWithPlan executes a pre-built SQL query with parameterised
	// arguments and returns the results as maps, in the same format as
	// List(). The SQL is generated by the OQL planner's SQL generator
	// and always selects from the data column with json_extract().
	QueryWithPlan(ctx context.Context, sql string, args []interface{}) ([]map[string]interface{}, error)
}

Queryable is an optional interface for storage backends that support predicate push-down. Backends that do not implement it receive the full Go-side execution path for all operations.

The OQL planner checks for this interface via type assertion. If the store satisfies Queryable and the entity cardinality exceeds the push-down threshold, the planner may generate SQL to delegate WHERE, ORDER BY, and LIMIT operations to the storage engine.

type SQLiteConfig

type SQLiteConfig struct {
	DBPath            string
	EnableWAL         bool // Write-Ahead Logging for better concurrency
	EnableForeignKeys bool
	CacheSize         int    // Page cache size in KB
	BusyTimeout       int    // Milliseconds to wait on locked database
	FullTextEnabled   bool   // Enable FTS5 full-text search indexing
	GraphEnabled      bool   // Enable graph edge table maintenance
	TenantID          uint16 // 0 = no tenant scoping

	// Performance tuning (zero = use backend defaults)
	//   SQLite defaults: MaxOpenConns=1 (WAL single-writer),
	//   MaxIdleConns=1, ReadPoolSize=NumCPU.
	MaxOpenConns        int // Max open write connections (0 = backend default)
	MaxIdleConns        int // Max idle write connections (0 = backend default)
	ReadPoolSize        int // Max open read connections (0 = backend default)
	ContentionThreshold int // Adaptive lock threshold 0-100 (default 95)
}

SQLiteConfig holds SQLite-specific configuration

type SQLiteStorageDialect

type SQLiteStorageDialect struct{}

SQLiteStorageDialect implements StorageDialect for SQLite.

func (*SQLiteStorageDialect) ColumnType

func (d *SQLiteStorageDialect) ColumnType(jsonType, format string, precision, scale int) string

func (*SQLiteStorageDialect) CreateIndexSQL

func (d *SQLiteStorageDialect) CreateIndexSQL(spec *AdaptedTableSpec) []string

func (*SQLiteStorageDialect) CreateTableSQL

func (d *SQLiteStorageDialect) CreateTableSQL(spec *AdaptedTableSpec) string

func (*SQLiteStorageDialect) DeleteSQL

func (d *SQLiteStorageDialect) DeleteSQL(spec *AdaptedTableSpec) string

func (*SQLiteStorageDialect) DenormaliseDecimal

func (d *SQLiteStorageDialect) DenormaliseDecimal(value string, precision, scale int) string

DenormaliseDecimal converts a scaled int64 string back to a client-facing decimal string by dividing by 10^scale.

"1990"  with scale=2 → "19.90"
"-1990" with scale=2 → "-19.90"
"0"     with scale=2 → "0.00"

func (*SQLiteStorageDialect) ExistsSQL

func (d *SQLiteStorageDialect) ExistsSQL(spec *AdaptedTableSpec) string

func (*SQLiteStorageDialect) InsertSQL

func (d *SQLiteStorageDialect) InsertSQL(spec *AdaptedTableSpec, hasExtra bool) (string, []string)

func (*SQLiteStorageDialect) MetadataTableSQL

func (d *SQLiteStorageDialect) MetadataTableSQL() string

func (*SQLiteStorageDialect) Name

func (d *SQLiteStorageDialect) Name() string

func (*SQLiteStorageDialect) NormaliseDecimal

func (d *SQLiteStorageDialect) NormaliseDecimal(value string, precision, scale int) (string, error)

NormaliseDecimal converts a decimal string to a scaled int64 string for SQLite INTEGER storage. The value is multiplied by 10^scale.

Examples for precision=6, scale=2:

"19.90"    → "1990"
"-19.90"   → "-1990"
"0"        → "0"
"9999.99"  → "999999"
"-0.01"    → "-1"

The scaled integer fits in int64 for precision up to 18.

func (*SQLiteStorageDialect) Placeholder

func (d *SQLiteStorageDialect) Placeholder(_ int) string

func (*SQLiteStorageDialect) SelectAllSQL

func (d *SQLiteStorageDialect) SelectAllSQL(spec *AdaptedTableSpec) string

func (*SQLiteStorageDialect) SelectSQL

func (d *SQLiteStorageDialect) SelectSQL(spec *AdaptedTableSpec) string

func (*SQLiteStorageDialect) SupportsNativeDecimalAggregation

func (d *SQLiteStorageDialect) SupportsNativeDecimalAggregation() bool

SupportsNativeDecimalAggregation returns false for SQLite. Although SQLite can SUM integers correctly, the scaled representation requires division by the scale factor to produce correct decimal results. Go-side aggregation with shopspring/decimal avoids this complexity and handles AVG correctly.

func (*SQLiteStorageDialect) UpdateSQL

func (d *SQLiteStorageDialect) UpdateSQL(spec *AdaptedTableSpec, versionCheck bool) string

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore implements Store interface using SQLite database.

It maintains two connection pools against the same WAL-mode database:

  • db (writer): MaxOpenConns=1, serialises all writes.
  • readDB (reader): MaxOpenConns=NumCPU, query_only=ON, parallel reads.

Under WAL mode, readers never block the writer and vice-versa.

func NewSQLiteStore

func NewSQLiteStore(dbPath string, config SQLiteConfig) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite-based storage with separate reader and writer connection pools. Under WAL mode the writer never blocks readers and vice-versa, so splitting pools maximises concurrency.

func (*SQLiteStore) AdaptedColumnInfo

func (s *SQLiteStore) AdaptedColumnInfo(entity, jsonField string) (colName string, scale int, isDecimal bool, ok bool)

AdaptedColumnInfo returns column metadata for a JSON field in an adapted entity.

func (*SQLiteStore) AdaptedRegistry

func (s *SQLiteStore) AdaptedRegistry() *AdaptedRegistry

AdaptedRegistry returns the store's adapted table registry. Returns nil only if the store was not properly initialized.

func (*SQLiteStore) AdaptedTableName

func (s *SQLiteStore) AdaptedTableName(entity string) (string, bool)

AdaptedTableName returns the SQL table name for an adapted entity.

func (*SQLiteStore) AggregateQuery

func (s *SQLiteStore) AggregateQuery(ctx context.Context, sql string, args []interface{}, aliases []string) ([]map[string]interface{}, error)

AggregateQuery executes an aggregate SQL query against native columns and returns results keyed by alias names.

func (*SQLiteStore) Capabilities

func (s *SQLiteStore) Capabilities() QueryCapabilities

Capabilities reports that the SQLite backend can handle WHERE, ORDER BY, LIMIT, and COUNT natively via json_extract() push-down.

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the database connection

func (*SQLiteStore) Commit

func (s *SQLiteStore) Commit(ctx context.Context, req CommitRequest) (CommitResult, error)

Commit performs an atomic upsert + one or more inserts in a single SQLite transaction. The upsert supports optional CAS via Update.Version. All operations share one BEGIN/COMMIT boundary; any failure rolls back the entire set.

func (*SQLiteStore) Config

func (s *SQLiteStore) Config() StoreConfig

Config returns the store's StoreConfig.

func (*SQLiteStore) ContentionLock

func (s *SQLiteStore) ContentionLock() *AdaptiveLock

ContentiontLock returns the store's adaptive lock, allowing runtime configuration of the contention threshold via SetThreshold().

func (*SQLiteStore) CountEntities

func (s *SQLiteStore) CountEntities(ctx context.Context, entity string) (int, error)

CountEntities returns the number of records for an entity type without fetching the data. This is a single indexed COUNT(*) — typically <10µs.

func (*SQLiteStore) Create

func (s *SQLiteStore) Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)

Create inserts a new entity with auto-generated ID

func (*SQLiteStore) DB

func (s *SQLiteStore) DB() *sql.DB

DB returns the underlying *sql.DB for advanced operations such as batch seeding or direct SQL execution. Use with care — callers must respect the store's locking and schema conventions.

func (*SQLiteStore) Delete

func (s *SQLiteStore) Delete(ctx context.Context, entity string, id int) error

Delete removes an entity

func (*SQLiteStore) Exists

func (s *SQLiteStore) Exists(ctx context.Context, entity string, id int) bool

Exists checks if an entity exists

func (*SQLiteStore) FullTextSearch

func (s *SQLiteStore) FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)

FullTextSearch performs a full-text search across entities

func (*SQLiteStore) Get

func (s *SQLiteStore) Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)

Get retrieves an entity by ID

func (*SQLiteStore) GraphTenantIDs

func (s *SQLiteStore) GraphTenantIDs(ctx context.Context) ([]uint16, error)

GraphTenantIDs implements TenantIDLister. It returns all tenant IDs for which a graph_tXXXX edge table should be hydrated at startup. Tenant 0 is always included first (it is implicit and never appears in the tenants registry table). Registered non-zero tenants follow in ascending order.

func (*SQLiteStore) Info

func (s *SQLiteStore) Info() StoreInfo

Info returns store information

func (*SQLiteStore) IsAdaptedEntity

func (s *SQLiteStore) IsAdaptedEntity(entity string) bool

IsAdaptedEntity reports whether the entity uses an adapted table.

func (*SQLiteStore) List

func (s *SQLiteStore) List(ctx context.Context, entity string) ([]map[string]interface{}, error)

List returns all entities of a given type

func (*SQLiteStore) ListEntities

func (s *SQLiteStore) ListEntities(ctx context.Context) ([]string, error)

ListEntities returns all distinct entity types in the database

func (*SQLiteStore) ListPaged

func (s *SQLiteStore) ListPaged(ctx context.Context, entity string, limit, offset int) (*PagedResult, error)

ListPaged returns a single page of entities plus total count, using SQL LIMIT/OFFSET so only the requested page is deserialised.

func (*SQLiteStore) ListWithFields

func (s *SQLiteStore) ListWithFields(ctx context.Context, entity string, fields []string) ([]map[string]interface{}, error)

ListWithFields returns all records for an entity, extracting only the named fields from each JSON blob. For adapted entities this falls through to the regular List path (native columns are already efficient).

func (*SQLiteStore) ListWithFieldsAndFilter

func (s *SQLiteStore) ListWithFieldsAndFilter(ctx context.Context, entity string, fields []string, preds *jsonic.PredicateSet) ([]map[string]interface{}, error)

ListWithFieldsAndFilter returns records for an entity, extracting only the named fields and evaluating predicates inline during tokenisation. Rows that fail the predicates are never materialised as maps. For adapted entities this falls through to the regular path.

func (*SQLiteStore) Patch

func (s *SQLiteStore) Patch(ctx context.Context, entity string, id int, updates map[string]interface{}) error

Patch partially updates an entity

func (*SQLiteStore) PatchValidated

func (s *SQLiteStore) PatchValidated(ctx context.Context, entity string, id int, updates map[string]interface{}, validate func(merged map[string]interface{}) error) error

PatchValidated applies a partial update inside a transaction and runs the validator against the merged data before committing.

func (*SQLiteStore) Ping

func (s *SQLiteStore) Ping(ctx context.Context) error

Ping verifies that the database connection is alive.

func (*SQLiteStore) QueryWithFields

func (s *SQLiteStore) QueryWithFields(ctx context.Context, sqlQuery string, args []interface{}, fields []string) ([]map[string]interface{}, error)

QueryWithFields executes a push-down SQL query and extracts only the named fields from each result's JSON blob.

func (*SQLiteStore) QueryWithPlan

func (s *SQLiteStore) QueryWithPlan(ctx context.Context, sqlQuery string, args []interface{}) ([]map[string]interface{}, error)

QueryWithPlan executes a pre-built SQL query (generated by the OQL planner) and returns the results as maps, in the same format as List().

func (*SQLiteStore) ReaderDB

func (s *SQLiteStore) ReaderDB() *sql.DB

ReaderDB returns the underlying reader connection pool. Used by the tenant persister for read-only queries.

func (*SQLiteStore) RebuildGraph

func (s *SQLiteStore) RebuildGraph(ctx context.Context) error

RebuildGraph rebuilds the tenant edge table from stored entity JSON.

Correctness: uses models.ExtractEntityEdges for REF extraction so that @REFS ([]interface{} of REF maps) and TSREF exclusion are handled identically to the live syncGraphEdges path.

Performance: one PrepareContext call outside the row loop; edges are accumulated and flushed in batches of rebuildBatchSize rather than one ExecContext per edge.

func (*SQLiteStore) RegisterAdaptedEntity

func (s *SQLiteStore) RegisterAdaptedEntity(ctx context.Context, entity string, schema map[string]interface{}) error

RegisterAdaptedEntity derives an adapted table for the given entity type from its JSON Schema and creates the table if it doesn't exist. This is called by the server layer when a schema is loaded or registered.

func (*SQLiteStore) Save

func (s *SQLiteStore) Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)

Save creates an entity with a specific ID (fails if exists)

func (*SQLiteStore) ScanGraphEdges

func (s *SQLiteStore) ScanGraphEdges(ctx context.Context, tenantID uint16, fn func(GraphEdge) error) error

ScanGraphEdges implements GraphEdgeScanner. It streams every row from the tenant-scoped graph_tXXXX edge table, calling fn once per row. Iteration stops on the first non-nil error returned by fn. Rows are read via the reader pool (query_only, parallel-safe). All tenants, including tenant 0, use graph_tXXXX.

func (*SQLiteStore) Search

func (s *SQLiteStore) Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)

Search implements field-based search using JSON extraction

func (*SQLiteStore) StorageDialectFor

func (s *SQLiteStore) StorageDialectFor(entity string) StorageDialect

StorageDialectFor returns the StorageDialect for the given entity, or nil if the entity is not adapted.

func (*SQLiteStore) Update

func (s *SQLiteStore) Update(ctx context.Context, entity string, id int, data map[string]interface{}) error

Update replaces an entity completely

func (*SQLiteStore) VerifyGraphIntegrity

func (s *SQLiteStore) VerifyGraphIntegrity(ctx context.Context) error

VerifyGraphIntegrity checks whether the tenant edge table matches the REF fields in stored entity JSON. Returns a joined error listing every discrepancy found (missing edges + unexpected edges); does not stop at the first violation.

Both reads (entities and edge table) are issued inside a single read transaction so that concurrent writes cannot produce false violations.

Memory: only one map is materialised (expected edges derived from entity JSON). Actual edges from the edge table are streamed and checked against that map rather than accumulated into a second map.

func (*SQLiteStore) WithLogger

func (s *SQLiteStore) WithLogger(logger zerolog.Logger) *SQLiteStore

WithLogger attaches a zerolog.Logger to the store. Returns the store so it can be chained: store := NewSQLiteStore(...).WithLogger(logger). Until this is called the store uses zerolog.Nop() and logs nothing.

type SQLiteTenantPersister

type SQLiteTenantPersister struct {
	// contains filtered or unexported fields
}

SQLiteTenantPersister implements tenant.Persister using the tenants table in the shared SQLite database. It uses the writer pool for saves and the reader pool for loads.

func NewSQLiteTenantPersister

func NewSQLiteTenantPersister(db, readDB *sql.DB) *SQLiteTenantPersister

NewSQLiteTenantPersister creates a persister backed by the given database connections. Both must point to the same SQLite database. The writer is used for Save; the reader for LoadAll.

func (*SQLiteTenantPersister) LoadAll

func (p *SQLiteTenantPersister) LoadAll(ctx context.Context) (map[string]uint16, error)

LoadAll returns all persisted tenant name-to-ID mappings.

func (*SQLiteTenantPersister) Save

func (p *SQLiteTenantPersister) Save(ctx context.Context, name string, id uint16) error

Save persists a tenant mapping. Idempotent: re-saving the same (name, id) pair is not an error. Conflicts (same ID with different name, or same name with different ID) return an error.

type SchemaBrowser

type SchemaBrowser struct {
	// contains filtered or unexported fields
}

SchemaBrowser implements SchemaIntrospector over a queryfy ObjectSchema.

func (*SchemaBrowser) AllowsAdditional

func (b *SchemaBrowser) AllowsAdditional() bool

func (*SchemaBrowser) FieldNames

func (b *SchemaBrowser) FieldNames() []string

func (*SchemaBrowser) GetField

func (b *SchemaBrowser) GetField(name string) FieldIntrospector

func (*SchemaBrowser) IsRequired

func (b *SchemaBrowser) IsRequired(name string) bool

type SchemaDiff

type SchemaDiff struct {
	Added   []ColumnDef    // Columns in new but not in old
	Dropped []ColumnDef    // Columns in old but not in new
	Changed []ColumnChange // Columns present in both but with incompatible type changes

	IndexesAdded   []IndexDef // Indexes in new but not in old
	IndexesDropped []IndexDef // Indexes in old but not in new

	HasExtraChanged bool // Whether _extra presence changed
	NewHasExtra     bool // The new HasExtra value (only meaningful if HasExtraChanged)
}

SchemaDiff describes the differences between an old and new AdaptedTableSpec. It is the migration plan for schema evolution.

func DiffAdaptedSpecs

func DiffAdaptedSpecs(old, new *AdaptedTableSpec) *SchemaDiff

DiffAdaptedSpecs compares two AdaptedTableSpecs and produces a migration plan. The old spec represents the currently deployed table; the new spec represents the desired state from the updated schema.

func (*SchemaDiff) HasTypeConflicts

func (d *SchemaDiff) HasTypeConflicts() bool

HasTypeConflicts reports whether there are incompatible type changes that prevent automatic migration.

func (*SchemaDiff) IsEmpty

func (d *SchemaDiff) IsEmpty() bool

IsEmpty reports whether the diff contains no changes.

type SchemaIntrospector

type SchemaIntrospector interface {
	// FieldNames returns all declared field names, sorted alphabetically.
	FieldNames() []string

	// GetField returns the field descriptor for a named field.
	// Returns nil if the field does not exist.
	GetField(name string) FieldIntrospector

	// IsRequired reports whether a field is required.
	IsRequired(name string) bool

	// AllowsAdditional reports whether the schema accepts fields not
	// declared in its field list. Returns true if additionalProperties
	// is absent or true; false if explicitly set to false.
	AllowsAdditional() bool
}

SchemaIntrospector provides read access to an object schema's structure. This is the primary interface consumed by DeriveAdaptedTableSpec.

func NewJSONSchemaIntrospector

func NewJSONSchemaIntrospector(schema map[string]interface{}) SchemaIntrospector

NewJSONSchemaIntrospector wraps a parsed JSON Schema document. Returns nil if the schema has no "properties" key.

func NewSchemaBrowser

func NewSchemaBrowser(obj *builders.ObjectSchema) SchemaIntrospector

NewSchemaBrowser wraps a queryfy ObjectSchema for introspection. Returns nil if obj is nil.

type Searcher

type Searcher interface {
	Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)
}

Searcher defines optional search capabilities

type StmtCache

type StmtCache struct {
	// contains filtered or unexported fields
}

StmtCache is a concurrency-safe LRU cache of prepared statements.

The cache is keyed by the SQL string. Since the OQL planner produces parameterised queries with ? placeholders, two queries with different parameter values but the same shape share a single prepared statement.

Lifecycle:

  • Created during SQLiteStore initialisation.
  • Get() returns a cached *sql.Stmt or prepares and caches a new one.
  • Close() closes all cached statements (called from SQLiteStore.Close).
  • Invalidate() removes a single entry (for schema evolution).
  • Reset() closes and removes all entries.

func NewStmtCache

func NewStmtCache(db *sql.DB, maxSize int) *StmtCache

NewStmtCache creates a statement cache that prepares against db. Pass maxSize=0 to use DefaultStmtCacheSize.

func (*StmtCache) Close

func (c *StmtCache) Close()

Close closes all cached statements. After Close, the cache must not be used. Called from SQLiteStore.Close().

func (*StmtCache) Get

func (c *StmtCache) Get(query string) (*sql.Stmt, error)

Get returns a prepared statement for the given SQL. If the statement is not cached, it is prepared and added to the cache, evicting the least-recently-used entry if the cache is full.

The returned *sql.Stmt must NOT be closed by the caller — the cache owns the statement's lifecycle.

func (*StmtCache) Invalidate

func (c *StmtCache) Invalidate(query string)

Invalidate removes and closes the prepared statement for the given SQL, if cached. Used when a schema change invalidates a statement (e.g. adapted table ALTER TABLE).

func (*StmtCache) Len

func (c *StmtCache) Len() int

Len returns the number of cached statements. Primarily for testing.

func (*StmtCache) Reset

func (c *StmtCache) Reset()

Reset closes all cached statements and empties the cache. The cache remains usable — new statements will be prepared on demand.

type StorageDialect

type StorageDialect interface {
	// Name returns the dialect identifier ("sqlite", "postgres", etc.).
	Name() string

	// Placeholder returns a parameter placeholder for the n-th argument
	// (1-based). SQLite: "?", PostgreSQL: "$1", "$2", etc.
	Placeholder(n int) string

	// ColumnType maps a JSON Schema type + format to the backend's native
	// SQL column type. Examples:
	//   ("string", "")        → "TEXT"
	//   ("integer", "")       → "INTEGER" (SQLite) / "BIGINT" (PostgreSQL)
	//   ("number", "")        → "REAL" (SQLite) / "DOUBLE PRECISION" (PostgreSQL)
	//   ("number", "decimal") → "TEXT" (SQLite) / "NUMERIC(p,s)" (PostgreSQL)
	//   ("boolean", "")       → "INTEGER" (SQLite) / "BOOLEAN" (PostgreSQL)
	//   ("array", "")         → "TEXT" (SQLite) / "JSONB" (PostgreSQL)
	//   ("object", "")        → "TEXT" (SQLite) / "JSONB" (PostgreSQL)
	//
	// For decimals, precision and scale are provided for backends that
	// support native fixed-point (PostgreSQL NUMERIC). Backends that store
	// decimals as text (SQLite) may ignore them.
	ColumnType(jsonType, format string, precision, scale int) string

	// CreateTableSQL generates the CREATE TABLE statement for an adapted
	// table. Implementations must include system columns (id, tenant_id,
	// _extra if hasExtra, _version, created_at, updated_at) and the
	// primary key.
	CreateTableSQL(spec *AdaptedTableSpec) string

	// CreateIndexSQL generates CREATE INDEX statements for the adapted
	// table's indexes.
	CreateIndexSQL(spec *AdaptedTableSpec) []string

	// InsertSQL generates an INSERT statement with the appropriate
	// placeholders. Returns the SQL string and the expected argument
	// order (column names). The caller provides the actual values.
	InsertSQL(spec *AdaptedTableSpec, hasExtra bool) (sql string, columns []string)

	// SelectSQL generates a SELECT statement for a single row by
	// tenant_id and id.
	SelectSQL(spec *AdaptedTableSpec) string

	// SelectAllSQL generates a SELECT statement for all rows in a tenant,
	// ordered by id.
	SelectAllSQL(spec *AdaptedTableSpec) string

	// UpdateSQL generates an UPDATE statement with the appropriate
	// placeholders. The versionCheck parameter controls whether a
	// _version = ? clause is appended to the WHERE.
	UpdateSQL(spec *AdaptedTableSpec, versionCheck bool) string

	// DeleteSQL generates a DELETE statement for a single row.
	DeleteSQL(spec *AdaptedTableSpec) string

	// ExistsSQL generates an EXISTS check for a single row.
	ExistsSQL(spec *AdaptedTableSpec) string

	// MetadataTableSQL generates the DDL for the adapted_table_schemas
	// metadata table.
	MetadataTableSQL() string

	// NormaliseDecimal transforms a validated decimal string into the
	// storage representation for this backend. SQLite scales to int64.
	// PostgreSQL returns the value unchanged.
	NormaliseDecimal(value string, precision, scale int) (string, error)

	// DenormaliseDecimal transforms the stored representation back to
	// a client-facing string. SQLite divides by 10^scale and formats.
	// PostgreSQL returns the value unchanged.
	DenormaliseDecimal(value string, precision, scale int) string

	// SupportsNativeDecimalAggregation reports whether the backend
	// handles SUM/AVG on decimal columns with exact arithmetic.
	// If false, OQL aggregates in Go using shopspring/decimal.
	SupportsNativeDecimalAggregation() bool
}

StorageDialect defines backend-specific SQL generation for adapted tables.

type Store

type Store interface {
	// Config returns the store's configuration.
	Config() StoreConfig

	// Entity CRUD operations
	Create(ctx context.Context, entity string, data map[string]interface{}) (int, error)
	Get(ctx context.Context, entity string, id int) (map[string]interface{}, error)
	Update(ctx context.Context, entity string, id int, data map[string]interface{}) error
	Patch(ctx context.Context, entity string, id int, data map[string]interface{}) error
	// PatchValidated is like Patch but runs a validation function against the
	// merged data inside the transaction. If the validator returns an error,
	// the transaction is rolled back and the error is returned to the caller.
	// This avoids TOCTOU races where a Get-merge-Update sequence can observe
	// stale data between the Get and the Update.
	PatchValidated(ctx context.Context, entity string, id int, data map[string]interface{}, validate func(merged map[string]interface{}) error) error
	Delete(ctx context.Context, entity string, id int) error
	// Save upserts an entity with the caller-specified ID: creates it if it
	// does not exist, overwrites it if it does. Returns (true, nil) when a
	// new record was created and (false, nil) when an existing record was
	// replaced. Never returns an error solely because the ID already exists.
	Save(ctx context.Context, entity string, id int, data map[string]interface{}) (bool, error)

	// Commit performs an atomic upsert + one or more inserts in a single
	// storage transaction. The upsert (req.Update) supports optional
	// optimistic concurrency via Version. Each entry in req.Append is an
	// unconditional insert; a duplicate explicit ID returns ErrAlreadyExists
	// and rolls back the entire commit. Returns ErrConflict when the Update
	// version check fails.
	Commit(ctx context.Context, req CommitRequest) (CommitResult, error)

	// Query operations
	List(ctx context.Context, entity string) ([]map[string]interface{}, error)
	Exists(ctx context.Context, entity string, id int) bool
	Search(ctx context.Context, entity string, field string, query string, matchType string) ([]map[string]interface{}, error)

	// Full-text search (optional - may return empty if not supported)
	FullTextSearch(ctx context.Context, query string, entity string) ([]map[string]interface{}, error)

	// Ping verifies that the storage backend is reachable. Returns nil on
	// success. Used by health and readiness probes.
	Ping(ctx context.Context) error

	// Lifecycle
	Close() error
}

Store defines the core interface for entity storage backends

func NewStore

func NewStore(name string, config map[string]interface{}) (Store, error)

NewStore creates a new store instance by name

func NewStoreFromConfig

func NewStoreFromConfig(cfg StoreConfig) (Store, error)

NewStoreFromConfig creates a store directly from a StoreConfig. This is the preferred constructor for tenant-scoped stores.

type StoreConfig

type StoreConfig struct {
	Type            string // "sqlite", "jsonfile"
	DBPath          string // SQLite database file path
	BaseDir         string // JSONFile base directory
	Schema          string // JSONFile schema subdirectory
	FullTextEnabled bool   // controls FTS indexing in backend
	GraphEnabled    bool   // controls graph edge table maintenance
	TenantID        uint16 // 0 = no tenant scoping

	// Performance tuning (SQLite-specific; zero = use defaults)
	SQLiteCacheSize           int // Page cache size in KB
	SQLiteBusyTimeout         int // Milliseconds to wait on locked database
	SQLiteMaxOpenConns        int // Max open database connections
	SQLiteMaxIdleConns        int // Max idle database connections
	SQLiteReadPoolSize        int // Max open read connections (0 = auto)
	SQLiteContentionThreshold int // Adaptive lock threshold 0-100
}

StoreConfig is the canonical configuration for all store backends. A store is constructed with a StoreConfig and scoped to that config for its entire lifetime. TenantID 0 means no tenant scoping.

type StoreFactory

type StoreFactory func(config map[string]interface{}) (Store, error)

StoreFactory is a function that creates a new Store instance

type StoreInfo

type StoreInfo struct {
	Type                string // "jsonfile", "sqlite", "postgres", etc.
	Version             string
	SupportsSearch      bool
	SupportsBatch       bool
	SupportsTransaction bool
}

StoreInfo provides metadata about the store implementation

type TenantIDLister

type TenantIDLister interface {
	GraphTenantIDs(ctx context.Context) ([]uint16, error)
}

TenantIDLister is an optional interface for storage backends that can enumerate all tenant IDs for which a graph_tXXXX edge table exists. The returned slice must always include tenant 0 (the implicit default). Used during startup graph hydration to restore graph state for all tenants.

Backends that do not implement this interface fall back to scanning only tenant 0 via a direct ScanGraphEdges call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL