postgres

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package postgres is the durable PostgreSQL storage plugin for cyoda-go.

It ships in the stock binary alongside the memory plugin and serves as the reference example for the DescribablePlugin pattern (ConfigVars() drives --help output) and for the txID-to-physical-handle bridge (pgx.Tx lookup via the internal txRegistry).

Configuration

Plugin-namespaced env vars, all read via the injected getenv:

CYODA_POSTGRES_URL                (required) PostgreSQL connection string
CYODA_POSTGRES_MAX_CONNS          default 25
CYODA_POSTGRES_MIN_CONNS          default 5
CYODA_POSTGRES_MAX_CONN_IDLE_TIME default 5m
CYODA_POSTGRES_AUTO_MIGRATE       default true  (runs embedded SQL migrations at startup)

Migrations and context cancellation

NewFactory receives a startup context with a deadline. The embedded SQL migrations run via golang-migrate/migrate/v4, whose m.Up() method does not accept a context. To honor the deadline, runMigrations runs m.Up() in a goroutine and signals m.GracefulStop on ctx.Done() to interrupt at the next migration-step boundary.

TransactionManager and RLS

The plugin's TM is a lifecycle tracker over a thread-safe txRegistry mapping txID → pgx.Tx. TM.Begin starts a SERIALIZABLE transaction, runs SELECT set_config('app.current_tenant', $1, true) for row-level security (the set_config function accepts bound parameters where SET LOCAL does not under pgx's extended-query protocol), and records the handle in the registry. Stores hold a ctxQuerier that re-resolves the underlying pgx.Tx on every call from the passed-in context — so the active tx, discovered via spi.GetTransaction(ctx), is always used when one is present, and the pool is used otherwise.

Registration:

import _ "github.com/cyoda-platform/cyoda-go/plugins/postgres"

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(pool *pgxpool.Pool) error

Migrate preserves the existing exported API for test fixtures.

func NewPool

func NewPool(ctx context.Context, cfg DBConfig) (*pgxpool.Pool, error)

NewPool is a test-fixture entry point that wraps the internal newPool.

func RunMigrateWithDSN added in v0.7.1

func RunMigrateWithDSN(ctx context.Context, dsn string) error

RunMigrateWithDSN is the entry point for the `cyoda migrate` subcommand. It opens a connection pool from dsn, enforces the schema-compatibility contract (refuses when DB is newer than code), applies any pending migrations, and closes the pool before returning. The caller supplies a context for timeout/cancellation control.

Returns a descriptive error when:

  • dsn is empty
  • the pool cannot be opened or pinged
  • the schema is newer than this binary's embedded migrations
  • the migration state is dirty (manual intervention required)
  • any migration step fails

func SetDebugMode added in v0.7.1

func SetDebugMode(on bool)

SetDebugMode toggles the dev-time operator-contract assertion flag. Public so cmd/cyoda/main.go can switch it at startup and so tests can opt-in to the stricter path.

Types

type ApplyFunc added in v0.7.1

type ApplyFunc func(base []byte, delta spi.SchemaDelta) ([]byte, error)

ApplyFunc replays an opaque SchemaDelta onto a base schema represented in the plugin's canonical bytes. Callers (cmd/cyoda/main.go) pass schema.Apply wrapped in a codec round-trip.

type DBConfig

type DBConfig struct {
	URL                     string
	MaxConns                int32
	MinConns                int32
	MaxConnIdleTime         string
	AutoMigrate             bool
	SchemaSavepointInterval int // 0 falls back to the internal default (64)
}

DBConfig is the exported config type retained for test-fixture callers. Production code in the plugin uses the internal config{} directly via parseConfig(getenv). Tests can construct a DBConfig, convert to config, and call NewPool as a thin wrapper.

type Querier

type Querier interface {
	Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

Querier abstracts pgxpool.Pool and pgx.Tx so that store implementations work both outside and inside transactions. Both pgxpool.Pool and pgx.Tx satisfy this interface natively — no adapter needed.

type StoreFactory

type StoreFactory struct {
	// contains filtered or unexported fields
}

StoreFactory implements spi.StoreFactory backed by PostgreSQL.

func NewStoreFactory

func NewStoreFactory(pool *pgxpool.Pool) *StoreFactory

NewStoreFactory creates a new PostgreSQL-backed StoreFactory. The factory is configured with defaults equivalent to parseConfig on an empty environment — callers that need non-default config (e.g. a custom SchemaSavepointInterval) should use newStoreFactoryWithConfig.

func (*StoreFactory) AsyncSearchStore

func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)

func (*StoreFactory) Close

func (f *StoreFactory) Close() error

func (*StoreFactory) EntityStore

func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)

func (*StoreFactory) InitTransactionManager added in v0.7.1

func (f *StoreFactory) InitTransactionManager(uuids spi.UUIDGenerator)

InitTransactionManager installs a TransactionManager on the factory using the given UUID generator. It must be called before the factory is used to manage transactions; until it is called, TransactionManager() returns an error. Calling it more than once overwrites the previous manager.

The internal alias initTransactionManager (same body, unexported) remains for use within the package; external callers — including test packages — should call this exported form.

func (*StoreFactory) KeyValueStore

func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)

func (*StoreFactory) MessageStore

func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)

func (*StoreFactory) ModelStore

func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)

func (*StoreFactory) Pool

func (f *StoreFactory) Pool() *pgxpool.Pool

Pool returns the underlying connection pool.

func (*StoreFactory) SetApplyFunc added in v0.7.1

func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))

SetApplyFunc installs the replay function used by modelStore.Get to fold the extension log. It may be called at most once — typically at factory-construction time in app/app.go. Calling it twice is a programmer error (panic).

The parameter is the unnamed function type (not postgres.ApplyFunc) so that an interface type-assertion in app/app.go can satisfy the setter uniformly across plugins whose named ApplyFunc types differ. Values of postgres.ApplyFunc are assignable to this parameter because the underlying signatures are identical.

func (*StoreFactory) StateMachineAuditStore

func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)

func (*StoreFactory) TransactionManager

func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)

TransactionManager implements spi.StoreFactory. Returns the TM configured on the factory. Errors if none was set.

func (*StoreFactory) WorkflowStore

func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager implements spi.TransactionManager backed by PostgreSQL with REPEATABLE READ isolation plus application-layer row-granular first-committer-wins validation. Each Begin() acquires a real pgx.Tx, registers it in the txRegistry, and allocates a *txState for read/write bookkeeping used by Commit.

func NewTransactionManager

func NewTransactionManager(pool *pgxpool.Pool, uuids spi.UUIDGenerator) *TransactionManager

NewTransactionManager creates a new PostgreSQL-backed TransactionManager.

func (*TransactionManager) Begin

Begin starts a new REPEATABLE READ transaction (snapshot isolation) and returns the transaction ID and a context carrying the TransactionState.

Row-granular first-committer-wins is enforced in application code via txState bookkeeping (readSet/writeSet) and commit-time validation — see Commit() and docs/superpowers/specs/2026-04-15-postgres-si-first-committer-wins-design.md.

func (*TransactionManager) Commit

func (tm *TransactionManager) Commit(ctx context.Context, txID string) error

Commit commits the transaction and records its submit time. Returns spi.ErrConflict on serialization failure (PostgreSQL error 40001) or when the application-layer first-committer-wins validation detects a stale read or write set.

Tenant isolation (issue #199 PR-C2): rejects callers whose UserContext tenant does not match the transaction's tenant. RLS protects data-path access (every DML is row-level filtered) but does not extend to transaction-lifecycle commands (BEGIN/COMMIT/ROLLBACK/SAVEPOINT/etc.) — those operate on connections and don't trigger any policy. So the application-layer tenant gate is the only protection against a caller authenticated as tenant A committing tenant B's in-flight work.

func (*TransactionManager) GetSubmitTime

func (tm *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)

GetSubmitTime returns the database timestamp recorded when the transaction was committed.

func (*TransactionManager) Join

func (tm *TransactionManager) Join(ctx context.Context, txID string) (context.Context, error)

Join attaches to an existing transaction, returning a context carrying its TransactionState.

Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers. Returning a context for another tenant's tx would let the joining caller drive arbitrary lifecycle operations on that tx — see Commit's godoc.

func (*TransactionManager) LookupTx

func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)

LookupTx exposes the registry lookup for use in tests and by the store layer (resolveQuerier). Production code should prefer resolveQuerier.

func (*TransactionManager) ReleaseSavepoint

func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error

ReleaseSavepoint releases a savepoint, merging its work into the parent transaction. The txState snapshot for this savepoint is dropped; work done after the push is kept.

Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers.

func (*TransactionManager) Rollback

func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error

Rollback aborts the transaction.

Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers. See Commit's godoc for the design rationale.

func (*TransactionManager) RollbackToSavepoint

func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error

RollbackToSavepoint rolls back all work done since the named savepoint and restores the txState readSet/writeSet to the snapshot captured at that savepoint.

Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers — destructive on tx-state.

func (*TransactionManager) Savepoint

func (tm *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)

Savepoint creates a named savepoint within the given PostgreSQL transaction and pushes a snapshot of the current readSet/writeSet onto the txState stack.

Tenant isolation (issue #199 PR-C2): rejects mismatched-tenant callers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL