Documentation
¶
Overview ¶
Package datatable is the workflow-facing shared-table store. The reference impl is the in-memory MockService below; production wires a Postgres backend that persists into `wick_data_tables` (schema) + `wick_data_table_rows` (data).
Schema is owned by the Data Tables tool (`internal/tools/data-tables/`) and consumed here for row validation, indexed-column hints, and access enforcement. See `internal/docs/workflow/12-data-tables.md`.
Package datatable — name↔id translation between caller-facing column names and storage-layer column ids.
Storage uses immutable `cN` ids as JSONB keys so renames touch only the schema row, never the data. Every read translates ids back to names before returning to the caller, and every write translates names to ids before persistence. Both happen through a single IDMap built once per request from the live Schema.
Package datatable — Postgres backend, shared JSONB rows model.
Two physical tables hold everything:
- wick_data_tables : one row per logical table, schema in JSONB
- wick_data_table_rows : one row per data row, keyed by table_slug, data column is JSONB keyed by column id
Renames touch only the schema row, never the data; column ids are monotonic per table (Schema.NextColID) so dropped column ids never reappear and orphan keys in legacy data are safely ignored on read.
Index ¶
- Constants
- Variables
- func IsSystemColumn(name string) bool
- func NextColumnID(sc *Schema) string
- type Access
- type Column
- type Condition
- type IDMap
- type MockService
- func (s *MockService) Count(slug string, where map[string]any) (int, error)
- func (s *MockService) CountConditions(slug string, conds []Condition) (int, error)
- func (s *MockService) CreateTable(sc Schema) error
- func (s *MockService) Delete(slug string, where map[string]any) (int, error)
- func (s *MockService) DeleteConditions(slug string, conds []Condition) (int, error)
- func (s *MockService) DropColumn(slug, name string) error
- func (s *MockService) DropTable(slug string) error
- func (s *MockService) Exists(slug string, where map[string]any) (bool, error)
- func (s *MockService) Get(slug string, key map[string]any) (map[string]any, bool, error)
- func (s *MockService) Insert(slug string, row map[string]any) error
- func (s *MockService) ListTables() []string
- func (s *MockService) LoadSchema(slug string) (Schema, error)
- func (s *MockService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, ...) ([]map[string]any, error)
- func (s *MockService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, ...) ([]map[string]any, error)
- func (s *MockService) RenameColumn(slug, from, to string) error
- func (s *MockService) SaveSchema(sc Schema) error
- func (s *MockService) Upsert(slug string, row map[string]any) (string, error)
- type PgService
- func (s *PgService) Count(slug string, where map[string]any) (int, error)
- func (s *PgService) CountConditions(slug string, conds []Condition) (int, error)
- func (s *PgService) CreateTable(sc Schema) error
- func (s *PgService) Delete(slug string, where map[string]any) (int, error)
- func (s *PgService) DeleteConditions(slug string, conds []Condition) (int, error)
- func (s *PgService) DropColumn(slug, name string) error
- func (s *PgService) DropTable(slug string) error
- func (s *PgService) Exists(slug string, where map[string]any) (bool, error)
- func (s *PgService) Get(slug string, key map[string]any) (map[string]any, bool, error)
- func (s *PgService) Insert(slug string, row map[string]any) error
- func (s *PgService) ListTables() []string
- func (s *PgService) LoadSchema(slug string) (Schema, error)
- func (s *PgService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, ...) ([]map[string]any, error)
- func (s *PgService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, ...) ([]map[string]any, error)
- func (s *PgService) RenameColumn(slug, from, to string) error
- func (s *PgService) SaveSchema(sc Schema) error
- func (s *PgService) Upsert(slug string, row map[string]any) (string, error)
- type Schema
- type Service
Constants ¶
const ( ModeStrict = "strict" ModeLax = "lax" )
Mode constants.
const ( ColID = "id" ColCreatedAt = "created_at" ColUpdatedAt = "updated_at" )
Reserved column names. Always present on every data table, always system-managed: `id` is the auto-incremented int PK, `created_at` and `updated_at` stamp themselves on row writes. Users cannot rename, drop, or hand-write these.
const ( OpEquals = "equals" OpNotEquals = "not_equals" OpGT = "gt" OpGTE = "gte" OpLT = "lt" OpLTE = "lte" OpContains = "contains" OpIn = "in" OpIsEmpty = "is_empty" OpIsNotEmpty = "is_not_empty" )
Supported condition op constants (n8n parity).
Variables ¶
var ErrSchemaMismatch = errors.New("data table row violates schema (strict mode)")
ErrSchemaMismatch indicates the row violates the schema in strict mode.
Functions ¶
func IsSystemColumn ¶
IsSystemColumn reports whether the name belongs to the reserved trio.
func NextColumnID ¶
NextColumnID bumps the schema's monotonic counter and returns the freshly-allocated id. Caller wraps the surrounding read-modify-write in a TX so concurrent column adds never collide.
Types ¶
type Access ¶
type Access struct {
Workflows []string `json:"workflows,omitempty"`
RowFilter string `json:"row_filter,omitempty"` // by_creator | none
}
Access limits which workflows can write the data table.
type Column ¶
type Column struct {
ID string `json:"id,omitempty"` // empty for system columns
Name string `json:"name"`
Type string `json:"type"` // string | int | float | bool | timestamp | json | enum
Required bool `json:"required,omitempty"`
Enum []string `json:"enum,omitempty"`
Indexed bool `json:"indexed,omitempty"`
Default any `json:"default,omitempty"`
System bool `json:"system,omitempty"`
}
Column is one column declaration.
ID is the immutable storage key — values for this column live in JSONB under data[ID]. Renaming the column updates Name only; ID stays the same, so existing rows do not need to be rewritten. New IDs are minted by NextColumnID(&schema) from the per-table monotonic counter (Schema.NextColID).
System columns (id, created_at, updated_at) are pseudo-columns: they do not live inside Data; the storage layer fills them from the composite primary key and timestamp columns. Their Column entries stay in Schema.Columns only so the UI can render the trio alongside user columns. ID is empty for system columns.
type Condition ¶
type Condition struct {
Column string `json:"column"`
Op string `json:"op"` // equals | not_equals | gt | gte | lt | lte | contains | in | is_empty | is_not_empty
Value any `json:"value,omitempty"`
}
Condition is one where-clause predicate. Mirrors the n8n Data Table node conditions (equals/not_equals/gt/gte/lt/lte/contains/in/is_empty/ is_not_empty). Plain `where map[string]any` still works for the common equality case — Conditions is layered on top for the richer surface.
type IDMap ¶
type IDMap struct {
NameToID map[string]string // "priority" → "c2"
IDToName map[string]string // "c2" → "priority"
Cols map[string]Column // "c2" → {ID:"c2", Name:"priority", Type:"int", ...}
}
IDMap is the materialised translation layer for one Schema.
Construct once per request with BuildIDMap and reuse for every row Encode/Decode in that request. Cheap to build (single pass over Schema.Columns), so callers should not bother caching across requests — the cache invalidation on schema change is trickier than the rebuild itself.
func BuildIDMap ¶
BuildIDMap walks the schema once and produces the bidirectional translation map. System columns are skipped — they live outside Data (id from row PK, timestamps from row columns).
func (IDMap) Decode ¶
Decode translates a storage row (keyed by column id) back into the caller-facing shape (keyed by user-facing name). Orphan keys — ids from columns that have since been dropped — are skipped silently; any non-cN keys (lax-mode extras) are passed through.
func (IDMap) Encode ¶
Encode translates a caller-supplied row (keyed by user-facing name) into the storage shape (keyed by column id).
Strict mode rejects unknown names with ErrSchemaMismatch. Lax mode passes them through under their original name so ad-hoc data survives until the user formalises a schema for it. System columns (id/created_at/updated_at) are dropped — the storage layer owns them.
type MockService ¶
type MockService struct {
// contains filtered or unexported fields
}
MockService keeps data tables in memory.
func NewMock ¶
func NewMock() *MockService
NewMock constructs an empty in-memory service. Test-only — production wires NewPg(db) via Manager.WithDataTablesDB so data survives restart.
func (*MockService) CountConditions ¶
func (s *MockService) CountConditions(slug string, conds []Condition) (int, error)
CountConditions counts rows that satisfy every condition.
func (*MockService) CreateTable ¶
func (s *MockService) CreateTable(sc Schema) error
CreateTable registers a brand-new table. The reserved trio (`id` int PK auto-increment, `created_at`, `updated_at`) is appended automatically; user columns slot in between. New user columns without a Column.ID are minted from the monotonic counter (Schema.NextColID) so storage rows are id-keyed.
Errors if the slug already exists.
func (*MockService) Delete ¶
Delete removes all rows matching where (caller uses user-facing names).
func (*MockService) DeleteConditions ¶
func (s *MockService) DeleteConditions(slug string, conds []Condition) (int, error)
DeleteConditions removes rows that match every Condition (AND semantics).
func (*MockService) DropColumn ¶
func (s *MockService) DropColumn(slug, name string) error
DropColumn removes one column from the schema and strips the matching id-keyed value from every stored row.
func (*MockService) DropTable ¶
func (s *MockService) DropTable(slug string) error
DropTable removes a table and all its rows.
func (*MockService) Get ¶
Get returns the first row matching key (user-facing names) decoded back to user-facing names.
func (*MockService) Insert ¶
func (s *MockService) Insert(slug string, row map[string]any) error
Insert appends a new row. `id` is auto-assigned (any user value is dropped); `created_at` + `updated_at` are stamped automatically. Row payload is encoded to column ids before storage so renames later don't touch data.
func (*MockService) ListTables ¶
func (s *MockService) ListTables() []string
ListTables returns every registered slug.
func (*MockService) LoadSchema ¶
func (s *MockService) LoadSchema(slug string) (Schema, error)
LoadSchema returns the registered schema.
func (*MockService) Query ¶
func (s *MockService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
Query returns matching rows with order + pagination (user-facing names).
func (*MockService) QueryConditions ¶
func (s *MockService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
QueryConditions returns matching rows for the richer condition list.
func (*MockService) RenameColumn ¶
func (s *MockService) RenameColumn(slug, from, to string) error
RenameColumn renames a user column. The storage id stays the same so no row is touched — only the Name field on the schema column changes.
func (*MockService) SaveSchema ¶
func (s *MockService) SaveSchema(sc Schema) error
SaveSchema registers or replaces a schema (upsert semantics). Columns dropped relative to the previous schema also have their matching JSONB key stripped from every row, so storage and metadata stay consistent.
type PgService ¶
type PgService struct {
// contains filtered or unexported fields
}
PgService is the GORM-backed Service. Write-through, no in-memory cache — every read hits Postgres so multi-instance deploys and process restarts see the same state.
func NewPg ¶
NewPg wires a Postgres-backed service. Caller is responsible for running AutoMigrate on entity.DataTable + entity.DataTableRow.
func (*PgService) CountConditions ¶
CountConditions counts rows matching every Condition.
func (*PgService) CreateTable ¶
CreateTable persists a new table's metadata. No DDL — rows live in the shared wick_data_table_rows table.
func (*PgService) DeleteConditions ¶
DeleteConditions removes rows matching every Condition.
func (*PgService) DropColumn ¶
DropColumn removes a user column. Schema entry goes first; the matching JSONB key is stripped from every row in the same TX, and the partial functional index (if any) is dropped.
func (*PgService) DropTable ¶
DropTable removes all data rows + the metadata row + every per-table index. Atomic.
func (*PgService) Get ¶
Get returns the first row matching the caller-supplied filter (translated to id keys). Rows are decoded back to name keys before return.
func (*PgService) Insert ¶
Insert appends a new row. id is allocated from the per-slug monotonic counter inside the TX so concurrent inserts never collide.
func (*PgService) ListTables ¶
ListTables returns every registered slug, ordered by creation time.
func (*PgService) LoadSchema ¶
LoadSchema returns the registered schema for a slug.
func (*PgService) Query ¶
func (s *PgService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
Query returns rows matching the where filter, sorted by order.
func (*PgService) QueryConditions ¶
func (s *PgService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
QueryConditions returns rows matching every Condition.
func (*PgService) RenameColumn ¶
RenameColumn changes the human-facing label of a user column. The storage id stays the same so no data row is touched.
func (*PgService) SaveSchema ¶
SaveSchema replaces the schema. Adds + drops are diffed against the previous schema so dropped columns also strip the matching JSONB key from every row (Postgres `data - 'cN'`) and drop the partial functional index when one exists. Renames are NOT detected here — callers should use RenameColumn for that, since SaveSchema cannot tell a rename apart from a drop+add.
type Schema ¶
type Schema struct {
Slug string `json:"-"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Mode string `json:"mode,omitempty"` // strict | lax
PrimaryKey []string `json:"primary_key,omitempty"`
Columns []Column `json:"columns"`
NextColID int `json:"next_col_id,omitempty"`
Access Access `json:"access,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
}
Schema is the per-data-table shape stored at `wick_data_tables.schema_json`.
NextColID is the monotonic column-id allocator. Every Column.ID is `c<NextColID>` and NextColID only ever increments — even when a column is dropped, its id is "burned" and never reused. This keeps historical row data correctly orphaned (the old id maps to nothing, Decode ignores it) instead of accidentally being reinterpreted as a freshly-added column.
type Service ¶
type Service interface {
// Table-level ops (n8n parity: create / list / update / delete table).
CreateTable(schema Schema) error
DropTable(slug string) error
ListTables() []string
LoadSchema(slug string) (Schema, error)
SaveSchema(schema Schema) error
// Column-level ops (spreadsheet UX parity).
RenameColumn(slug, from, to string) error
DropColumn(slug, name string) error
// Row ops — equality where for backward compat.
Insert(slug string, row map[string]any) error
Upsert(slug string, row map[string]any) (action string, err error)
Delete(slug string, where map[string]any) (int, error)
Get(slug string, key map[string]any) (map[string]any, bool, error)
Exists(slug string, where map[string]any) (bool, error)
Count(slug string, where map[string]any) (int, error)
Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
// Row ops — Condition list (n8n parity richer ops).
QueryConditions(slug string, conditions []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
DeleteConditions(slug string, conditions []Condition) (int, error)
CountConditions(slug string, conditions []Condition) (int, error)
}
Service is the workflow-facing data store contract.