datatable

package
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package datatable is the workflow-facing shared-table store. The reference impl is the in-memory MockService below; production wires a Postgres backend that persists into `wick_data_tables` (schema) + `wick_data_table_rows` (data).

Schema is owned by the Data Tables tool (`internal/tools/data-tables/`) and consumed here for row validation, indexed-column hints, and access enforcement. See `internal/docs/workflow/12-data-tables.md`.

Package datatable — name↔id translation between caller-facing column names and storage-layer column ids.

Storage uses immutable `cN` ids as JSONB keys so renames touch only the schema row, never the data. Every read translates ids back to names before returning to the caller, and every write translates names to ids before persistence. Both happen through a single IDMap built once per request from the live Schema.

Package datatable — Postgres backend, shared JSONB rows model.

Two physical tables hold everything:

  • wick_data_tables : one row per logical table, schema in JSONB
  • wick_data_table_rows : one row per data row, keyed by table_slug, data column is JSONB keyed by column id

Renames touch only the schema row, never the data; column ids are monotonic per table (Schema.NextColID) so dropped column ids never reappear and orphan keys in legacy data are safely ignored on read.

Index

Constants

View Source
const (
	ModeStrict = "strict"
	ModeLax    = "lax"
)

Mode constants.

View Source
const (
	ColID        = "id"
	ColCreatedAt = "created_at"
	ColUpdatedAt = "updated_at"
)

Reserved column names. Always present on every data table, always system-managed: `id` is the auto-incremented int PK, `created_at` and `updated_at` stamp themselves on row writes. Users cannot rename, drop, or hand-write these.

View Source
const (
	OpEquals     = "equals"
	OpNotEquals  = "not_equals"
	OpGT         = "gt"
	OpGTE        = "gte"
	OpLT         = "lt"
	OpLTE        = "lte"
	OpContains   = "contains"
	OpIn         = "in"
	OpIsEmpty    = "is_empty"
	OpIsNotEmpty = "is_not_empty"
)

Supported condition op constants (n8n parity).

Variables

View Source
var ErrSchemaMismatch = errors.New("data table row violates schema (strict mode)")

ErrSchemaMismatch indicates the row violates the schema in strict mode.

Functions

func IsSystemColumn

func IsSystemColumn(name string) bool

IsSystemColumn reports whether the name belongs to the reserved trio.

func NextColumnID

func NextColumnID(sc *Schema) string

NextColumnID bumps the schema's monotonic counter and returns the freshly-allocated id. Caller wraps the surrounding read-modify-write in a TX so concurrent column adds never collide.

Types

type Access

type Access struct {
	Workflows []string `json:"workflows,omitempty"`
	RowFilter string   `json:"row_filter,omitempty"` // by_creator | none
}

Access limits which workflows can write the data table.

type Column

type Column struct {
	ID       string   `json:"id,omitempty"` // empty for system columns
	Name     string   `json:"name"`
	Type     string   `json:"type"` // string | int | float | bool | timestamp | json | enum
	Required bool     `json:"required,omitempty"`
	Enum     []string `json:"enum,omitempty"`
	Indexed  bool     `json:"indexed,omitempty"`
	Default  any      `json:"default,omitempty"`
	System   bool     `json:"system,omitempty"`
}

Column is one column declaration.

ID is the immutable storage key — values for this column live in JSONB under data[ID]. Renaming the column updates Name only; ID stays the same, so existing rows do not need to be rewritten. New IDs are minted by NextColumnID(&schema) from the per-table monotonic counter (Schema.NextColID).

System columns (id, created_at, updated_at) are pseudo-columns: they do not live inside Data; the storage layer fills them from the composite primary key and timestamp columns. Their Column entries stay in Schema.Columns only so the UI can render the trio alongside user columns. ID is empty for system columns.

type Condition

type Condition struct {
	Column string `json:"column"`
	Op     string `json:"op"` // equals | not_equals | gt | gte | lt | lte | contains | in | is_empty | is_not_empty
	Value  any    `json:"value,omitempty"`
}

Condition is one where-clause predicate. Mirrors the n8n Data Table node conditions (equals/not_equals/gt/gte/lt/lte/contains/in/is_empty/ is_not_empty). Plain `where map[string]any` still works for the common equality case — Conditions is layered on top for the richer surface.

type IDMap

type IDMap struct {
	NameToID map[string]string // "priority" → "c2"
	IDToName map[string]string // "c2" → "priority"
	Cols     map[string]Column // "c2" → {ID:"c2", Name:"priority", Type:"int", ...}
}

IDMap is the materialised translation layer for one Schema.

Construct once per request with BuildIDMap and reuse for every row Encode/Decode in that request. Cheap to build (single pass over Schema.Columns), so callers should not bother caching across requests — the cache invalidation on schema change is trickier than the rebuild itself.

func BuildIDMap

func BuildIDMap(sc Schema) IDMap

BuildIDMap walks the schema once and produces the bidirectional translation map. System columns are skipped — they live outside Data (id from row PK, timestamps from row columns).

func (IDMap) Decode

func (m IDMap) Decode(stored map[string]any) map[string]any

Decode translates a storage row (keyed by column id) back into the caller-facing shape (keyed by user-facing name). Orphan keys — ids from columns that have since been dropped — are skipped silently; any non-cN keys (lax-mode extras) are passed through.

func (IDMap) Encode

func (m IDMap) Encode(row map[string]any, mode string) (map[string]any, error)

Encode translates a caller-supplied row (keyed by user-facing name) into the storage shape (keyed by column id).

Strict mode rejects unknown names with ErrSchemaMismatch. Lax mode passes them through under their original name so ad-hoc data survives until the user formalises a schema for it. System columns (id/created_at/updated_at) are dropped — the storage layer owns them.

type MockService

type MockService struct {
	// contains filtered or unexported fields
}

MockService keeps data tables in memory.

func NewMock

func NewMock() *MockService

NewMock constructs an empty in-memory service. Test-only — production wires NewPg(db) via Manager.WithDataTablesDB so data survives restart.

func (*MockService) Count

func (s *MockService) Count(slug string, where map[string]any) (int, error)

Count returns rows matching where.

func (*MockService) CountConditions

func (s *MockService) CountConditions(slug string, conds []Condition) (int, error)

CountConditions counts rows that satisfy every condition.

func (*MockService) CreateTable

func (s *MockService) CreateTable(sc Schema) error

CreateTable registers a brand-new table. The reserved trio (`id` int PK auto-increment, `created_at`, `updated_at`) is appended automatically; user columns slot in between. New user columns without a Column.ID are minted from the monotonic counter (Schema.NextColID) so storage rows are id-keyed.

Errors if the slug already exists.

func (*MockService) Delete

func (s *MockService) Delete(slug string, where map[string]any) (int, error)

Delete removes all rows matching where (caller uses user-facing names).

func (*MockService) DeleteConditions

func (s *MockService) DeleteConditions(slug string, conds []Condition) (int, error)

DeleteConditions removes rows that match every Condition (AND semantics).

func (*MockService) DropColumn

func (s *MockService) DropColumn(slug, name string) error

DropColumn removes one column from the schema and strips the matching id-keyed value from every stored row.

func (*MockService) DropTable

func (s *MockService) DropTable(slug string) error

DropTable removes a table and all its rows.

func (*MockService) Exists

func (s *MockService) Exists(slug string, where map[string]any) (bool, error)

Exists reports whether any row matches where.

func (*MockService) Get

func (s *MockService) Get(slug string, key map[string]any) (map[string]any, bool, error)

Get returns the first row matching key (user-facing names) decoded back to user-facing names.

func (*MockService) Insert

func (s *MockService) Insert(slug string, row map[string]any) error

Insert appends a new row. `id` is auto-assigned (any user value is dropped); `created_at` + `updated_at` are stamped automatically. Row payload is encoded to column ids before storage so renames later don't touch data.

func (*MockService) ListTables

func (s *MockService) ListTables() []string

ListTables returns every registered slug.

func (*MockService) LoadSchema

func (s *MockService) LoadSchema(slug string) (Schema, error)

LoadSchema returns the registered schema.

func (*MockService) Query

func (s *MockService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)

Query returns matching rows with order + pagination (user-facing names).

func (*MockService) QueryConditions

func (s *MockService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)

QueryConditions returns matching rows for the richer condition list.

func (*MockService) RenameColumn

func (s *MockService) RenameColumn(slug, from, to string) error

RenameColumn renames a user column. The storage id stays the same so no row is touched — only the Name field on the schema column changes.

func (*MockService) SaveSchema

func (s *MockService) SaveSchema(sc Schema) error

SaveSchema registers or replaces a schema (upsert semantics). Columns dropped relative to the previous schema also have their matching JSONB key stripped from every row, so storage and metadata stay consistent.

func (*MockService) Upsert

func (s *MockService) Upsert(slug string, row map[string]any) (string, error)

Upsert insert-or-updates by primary key. The caller may pass an `id` to target an existing row (legacy compat with MCP `datatable_upsert`). When the id is missing or unknown, behaves like Insert.

type PgService

type PgService struct {
	// contains filtered or unexported fields
}

PgService is the GORM-backed Service. Write-through, no in-memory cache — every read hits Postgres so multi-instance deploys and process restarts see the same state.

func NewPg

func NewPg(db *gorm.DB) *PgService

NewPg wires a Postgres-backed service. Caller is responsible for running AutoMigrate on entity.DataTable + entity.DataTableRow.

func (*PgService) Count

func (s *PgService) Count(slug string, where map[string]any) (int, error)

Count returns the number of rows matching the where map.

func (*PgService) CountConditions

func (s *PgService) CountConditions(slug string, conds []Condition) (int, error)

CountConditions counts rows matching every Condition.

func (*PgService) CreateTable

func (s *PgService) CreateTable(sc Schema) error

CreateTable persists a new table's metadata. No DDL — rows live in the shared wick_data_table_rows table.

func (*PgService) Delete

func (s *PgService) Delete(slug string, where map[string]any) (int, error)

Delete removes rows matching the where filter.

func (*PgService) DeleteConditions

func (s *PgService) DeleteConditions(slug string, conds []Condition) (int, error)

DeleteConditions removes rows matching every Condition.

func (*PgService) DropColumn

func (s *PgService) DropColumn(slug, name string) error

DropColumn removes a user column. Schema entry goes first; the matching JSONB key is stripped from every row in the same TX, and the partial functional index (if any) is dropped.

func (*PgService) DropTable

func (s *PgService) DropTable(slug string) error

DropTable removes all data rows + the metadata row + every per-table index. Atomic.

func (*PgService) Exists

func (s *PgService) Exists(slug string, where map[string]any) (bool, error)

Exists is Get with a boolean view.

func (*PgService) Get

func (s *PgService) Get(slug string, key map[string]any) (map[string]any, bool, error)

Get returns the first row matching the caller-supplied filter (translated to id keys). Rows are decoded back to name keys before return.

func (*PgService) Insert

func (s *PgService) Insert(slug string, row map[string]any) error

Insert appends a new row. id is allocated from the per-slug monotonic counter inside the TX so concurrent inserts never collide.

func (*PgService) ListTables

func (s *PgService) ListTables() []string

ListTables returns every registered slug, ordered by creation time.

func (*PgService) LoadSchema

func (s *PgService) LoadSchema(slug string) (Schema, error)

LoadSchema returns the registered schema for a slug.

func (*PgService) Query

func (s *PgService) Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)

Query returns rows matching the where filter, sorted by order.

func (*PgService) QueryConditions

func (s *PgService) QueryConditions(slug string, conds []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)

QueryConditions returns rows matching every Condition.

func (*PgService) RenameColumn

func (s *PgService) RenameColumn(slug, from, to string) error

RenameColumn changes the human-facing label of a user column. The storage id stays the same so no data row is touched.

func (*PgService) SaveSchema

func (s *PgService) SaveSchema(sc Schema) error

SaveSchema replaces the schema. Adds + drops are diffed against the previous schema so dropped columns also strip the matching JSONB key from every row (Postgres `data - 'cN'`) and drop the partial functional index when one exists. Renames are NOT detected here — callers should use RenameColumn for that, since SaveSchema cannot tell a rename apart from a drop+add.

func (*PgService) Upsert

func (s *PgService) Upsert(slug string, row map[string]any) (string, error)

Upsert insert-or-updates by id. When row["id"] hits an existing row, fields are merged via JSONB concat; otherwise a fresh id is minted and the row is inserted.

type Schema

type Schema struct {
	Slug        string    `json:"-"`
	Name        string    `json:"name,omitempty"`
	Description string    `json:"description,omitempty"`
	Mode        string    `json:"mode,omitempty"` // strict | lax
	PrimaryKey  []string  `json:"primary_key,omitempty"`
	Columns     []Column  `json:"columns"`
	NextColID   int       `json:"next_col_id,omitempty"`
	Access      Access    `json:"access,omitempty"`
	CreatedAt   time.Time `json:"created_at,omitempty"`
	UpdatedAt   time.Time `json:"updated_at,omitempty"`
}

Schema is the per-data-table shape stored at `wick_data_tables.schema_json`.

NextColID is the monotonic column-id allocator. Every Column.ID is `c<NextColID>` and NextColID only ever increments — even when a column is dropped, its id is "burned" and never reused. This keeps historical row data correctly orphaned (the old id maps to nothing, Decode ignores it) instead of accidentally being reinterpreted as a freshly-added column.

type Service

type Service interface {
	// Table-level ops (n8n parity: create / list / update / delete table).
	CreateTable(schema Schema) error
	DropTable(slug string) error
	ListTables() []string

	LoadSchema(slug string) (Schema, error)
	SaveSchema(schema Schema) error

	// Column-level ops (spreadsheet UX parity).
	RenameColumn(slug, from, to string) error
	DropColumn(slug, name string) error

	// Row ops — equality where for backward compat.
	Insert(slug string, row map[string]any) error
	Upsert(slug string, row map[string]any) (action string, err error)
	Delete(slug string, where map[string]any) (int, error)
	Get(slug string, key map[string]any) (map[string]any, bool, error)
	Exists(slug string, where map[string]any) (bool, error)
	Count(slug string, where map[string]any) (int, error)
	Query(slug string, where map[string]any, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)

	// Row ops — Condition list (n8n parity richer ops).
	QueryConditions(slug string, conditions []Condition, order []workflow.DataTableOrder, limit, offset int) ([]map[string]any, error)
	DeleteConditions(slug string, conditions []Condition) (int, error)
	CountConditions(slug string, conditions []Condition) (int, error)
}

Service is the workflow-facing data store contract.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL