Documentation
¶
Overview ¶
Package postgres is the durable PostgreSQL storage plugin for cyoda-go.
It ships in the stock binary alongside the memory plugin and serves as the reference example for the DescribablePlugin pattern (ConfigVars() drives --help output) and for the txID-to-physical-handle bridge (pgx.Tx lookup via the internal txRegistry).
Configuration ¶
Plugin-namespaced env vars, all read via the injected getenv:
CYODA_POSTGRES_URL (required) PostgreSQL connection string CYODA_POSTGRES_MAX_CONNS default 25 CYODA_POSTGRES_MIN_CONNS default 5 CYODA_POSTGRES_MAX_CONN_IDLE_TIME default 5m CYODA_POSTGRES_AUTO_MIGRATE default true (runs embedded SQL migrations at startup)
Migrations and context cancellation ¶
NewFactory receives a startup context with a deadline. The embedded SQL migrations run via golang-migrate/migrate/v4, whose m.Up() method does not accept a context. To honor the deadline, runMigrations runs m.Up() in a goroutine and signals m.GracefulStop on ctx.Done() to interrupt at the next migration-step boundary.
TransactionManager and RLS ¶
The plugin's TM is a lifecycle tracker over a thread-safe txRegistry mapping txID → pgx.Tx. TM.Begin starts a SERIALIZABLE transaction, runs SELECT set_config('app.current_tenant', $1, true) for row-level security (the set_config function accepts bound parameters where SET LOCAL does not under pgx's extended-query protocol), and records the handle in the registry. Stores hold a ctxQuerier that re-resolves the underlying pgx.Tx on every call from the passed-in context — so the active tx, discovered via spi.GetTransaction(ctx), is always used when one is present, and the pool is used otherwise.
Registration:
import _ "github.com/cyoda-platform/cyoda-go/plugins/postgres"
Index ¶
- func Migrate(pool *pgxpool.Pool) error
- func NewPool(ctx context.Context, cfg DBConfig) (*pgxpool.Pool, error)
- func RunMigrateWithDSN(ctx context.Context, dsn string) error
- func SetDebugMode(on bool)
- type ApplyFunc
- type DBConfig
- type Querier
- type StoreFactory
- func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
- func (f *StoreFactory) Close() error
- func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
- func (f *StoreFactory) InitTransactionManager(uuids spi.UUIDGenerator)
- func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
- func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
- func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
- func (f *StoreFactory) Pool() *pgxpool.Pool
- func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))
- func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
- func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
- func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
- type TransactionManager
- func (tm *TransactionManager) Begin(ctx context.Context) (string, context.Context, error)
- func (tm *TransactionManager) Commit(ctx context.Context, txID string) error
- func (tm *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)
- func (tm *TransactionManager) Join(ctx context.Context, txID string) (context.Context, error)
- func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)
- func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
- func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error
- func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
- func (tm *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunMigrateWithDSN ¶ added in v0.7.1
RunMigrateWithDSN is the entry point for the `cyoda migrate` subcommand. It opens a connection pool from dsn, enforces the schema-compatibility contract (refuses when DB is newer than code), applies any pending migrations, and closes the pool before returning. The caller supplies a context for timeout/cancellation control.
Returns a descriptive error when:
- dsn is empty
- the pool cannot be opened or pinged
- the schema is newer than this binary's embedded migrations
- the migration state is dirty (manual intervention required)
- any migration step fails
func SetDebugMode ¶ added in v0.7.1
func SetDebugMode(on bool)
SetDebugMode toggles the dev-time operator-contract assertion flag. Public so cmd/cyoda/main.go can switch it at startup and so tests can opt-in to the stricter path.
Types ¶
type ApplyFunc ¶ added in v0.7.1
type ApplyFunc func(base []byte, delta spi.SchemaDelta) ([]byte, error)
ApplyFunc replays an opaque SchemaDelta onto a base schema represented in the plugin's canonical bytes. Callers (cmd/cyoda/main.go) pass schema.Apply wrapped in a codec round-trip.
type DBConfig ¶
type DBConfig struct {
URL string
MaxConns int32
MinConns int32
MaxConnIdleTime string
AutoMigrate bool
SchemaSavepointInterval int // 0 falls back to the internal default (64)
}
DBConfig is the exported config type retained for test-fixture callers. Production code in the plugin uses the internal config{} directly via parseConfig(getenv). Tests can construct a DBConfig, convert to config, and call NewPool as a thin wrapper.
type Querier ¶
type Querier interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
Querier abstracts pgxpool.Pool and pgx.Tx so that store implementations work both outside and inside transactions. Both pgxpool.Pool and pgx.Tx satisfy this interface natively — no adapter needed.
type StoreFactory ¶
type StoreFactory struct {
// contains filtered or unexported fields
}
StoreFactory implements spi.StoreFactory backed by PostgreSQL.
func NewStoreFactory ¶
func NewStoreFactory(pool *pgxpool.Pool) *StoreFactory
NewStoreFactory creates a new PostgreSQL-backed StoreFactory. The factory is configured with defaults equivalent to parseConfig on an empty environment — callers that need non-default config (e.g. a custom SchemaSavepointInterval) should use newStoreFactoryWithConfig.
func (*StoreFactory) AsyncSearchStore ¶
func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
func (*StoreFactory) Close ¶
func (f *StoreFactory) Close() error
func (*StoreFactory) EntityStore ¶
func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
func (*StoreFactory) InitTransactionManager ¶ added in v0.7.1
func (f *StoreFactory) InitTransactionManager(uuids spi.UUIDGenerator)
InitTransactionManager installs a TransactionManager on the factory using the given UUID generator. It must be called before the factory is used to manage transactions; until it is called, TransactionManager() returns an error. Calling it more than once overwrites the previous manager.
The internal alias initTransactionManager (same body, unexported) remains for use within the package; external callers — including test packages — should call this exported form.
func (*StoreFactory) KeyValueStore ¶
func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
func (*StoreFactory) MessageStore ¶
func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
func (*StoreFactory) ModelStore ¶
func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
func (*StoreFactory) Pool ¶
func (f *StoreFactory) Pool() *pgxpool.Pool
Pool returns the underlying connection pool.
func (*StoreFactory) SetApplyFunc ¶ added in v0.7.1
func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))
SetApplyFunc installs the replay function used by modelStore.Get to fold the extension log. It may be called at most once — typically at factory-construction time in app/app.go. Calling it twice is a programmer error (panic).
The parameter is the unnamed function type (not postgres.ApplyFunc) so that an interface type-assertion in app/app.go can satisfy the setter uniformly across plugins whose named ApplyFunc types differ. Values of postgres.ApplyFunc are assignable to this parameter because the underlying signatures are identical.
func (*StoreFactory) StateMachineAuditStore ¶
func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
func (*StoreFactory) TransactionManager ¶
func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
TransactionManager implements spi.StoreFactory. Returns the TM configured on the factory. Errors if none was set.
func (*StoreFactory) WorkflowStore ¶
func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager implements spi.TransactionManager backed by PostgreSQL with REPEATABLE READ isolation plus application-layer row-granular first-committer-wins validation. Each Begin() acquires a real pgx.Tx, registers it in the txRegistry, and allocates a *txState for read/write bookkeeping used by Commit.
func NewTransactionManager ¶
func NewTransactionManager(pool *pgxpool.Pool, uuids spi.UUIDGenerator) *TransactionManager
NewTransactionManager creates a new PostgreSQL-backed TransactionManager.
func (*TransactionManager) Begin ¶
Begin starts a new REPEATABLE READ transaction (snapshot isolation) and returns the transaction ID and a context carrying the TransactionState.
Row-granular first-committer-wins is enforced in application code via txState bookkeeping (readSet/writeSet) and commit-time validation — see Commit() and docs/superpowers/specs/2026-04-15-postgres-si-first-committer-wins-design.md.
func (*TransactionManager) Commit ¶
func (tm *TransactionManager) Commit(ctx context.Context, txID string) error
Commit commits the transaction and records its submit time. Returns spi.ErrConflict on serialization failure (PostgreSQL error 40001) or when the application-layer first-committer-wins validation detects a stale read or write set.
Tenant isolation (issue #199 PR-C2): rejects callers whose UserContext tenant does not match the transaction's tenant. RLS protects data-path access (every DML is row-level filtered) but does not extend to transaction-lifecycle commands (BEGIN/COMMIT/ROLLBACK/SAVEPOINT/etc.) — those operate on connections and don't trigger any policy. So the application-layer tenant gate is the only protection against a caller authenticated as tenant A committing tenant B's in-flight work.
func (*TransactionManager) GetSubmitTime ¶
GetSubmitTime returns the database timestamp recorded when the transaction was committed.
func (*TransactionManager) Join ¶
Join attaches to an existing transaction, returning a context carrying its TransactionState.
Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers. Returning a context for another tenant's tx would let the joining caller drive arbitrary lifecycle operations on that tx — see Commit's godoc.
func (*TransactionManager) LookupTx ¶
func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)
LookupTx exposes the registry lookup for use in tests and by the store layer (resolveQuerier). Production code should prefer resolveQuerier.
func (*TransactionManager) ReleaseSavepoint ¶
func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
ReleaseSavepoint releases a savepoint, merging its work into the parent transaction. The txState snapshot for this savepoint is dropped; work done after the push is kept.
Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers.
func (*TransactionManager) Rollback ¶
func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error
Rollback aborts the transaction.
Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers. See Commit's godoc for the design rationale.
func (*TransactionManager) RollbackToSavepoint ¶
func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
RollbackToSavepoint rolls back all work done since the named savepoint and restores the txState readSet/writeSet to the snapshot captured at that savepoint.
Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers — destructive on tx-state.
Source Files
¶
- build_mode.go
- classifying_querier.go
- commit_validator.go
- config.go
- doc.go
- entity_doc.go
- entity_store.go
- kv_store.go
- message_store.go
- migrate.go
- model_extensions.go
- model_store.go
- plugin.go
- querier.go
- search_store.go
- sm_audit_store.go
- store_factory.go
- transaction_manager.go
- tx_registry.go
- txstate.go
- uuid.go
- workflow_store.go