postgres

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package postgres is the durable PostgreSQL storage plugin for cyoda-go.

It ships in the stock binary alongside the memory plugin and serves as the reference example for the DescribablePlugin pattern (ConfigVars() drives --help output) and for the txID-to-physical-handle bridge (pgx.Tx lookup via the internal txRegistry).

Configuration

Plugin-namespaced env vars, all read via the injected getenv:

CYODA_POSTGRES_URL                (required) PostgreSQL connection string
CYODA_POSTGRES_MAX_CONNS          default 25
CYODA_POSTGRES_MIN_CONNS          default 5
CYODA_POSTGRES_MAX_CONN_IDLE_TIME default 5m
CYODA_POSTGRES_AUTO_MIGRATE       default true  (runs embedded SQL migrations at startup)

Migrations and context cancellation

NewFactory receives a startup context with a deadline. The embedded SQL migrations run via golang-migrate/migrate/v4, whose m.Up() method does not accept a context. To honor the deadline, runMigrations runs m.Up() in a goroutine and signals m.GracefulStop on ctx.Done() to interrupt at the next migration-step boundary.

TransactionManager and RLS

The plugin's TM is a lifecycle tracker over a thread-safe txRegistry mapping txID → pgx.Tx. TM.Begin starts a SERIALIZABLE transaction, runs SELECT set_config('app.current_tenant', $1, true) for row-level security (the set_config function accepts bound parameters where SET LOCAL does not under pgx's extended-query protocol), and records the handle in the registry. Stores hold a ctxQuerier that re-resolves the underlying pgx.Tx on every call from the passed-in context — so the active tx, discovered via spi.GetTransaction(ctx), is always used when one is present, and the pool is used otherwise.

Registration:

import _ "github.com/cyoda-platform/cyoda-go/plugins/postgres"

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(pool *pgxpool.Pool) error

Migrate preserves the existing exported API for test fixtures.

func MigrateDown

func MigrateDown(pool *pgxpool.Pool) error

MigrateDown is preserved for test cleanup.

func NewPool

func NewPool(ctx context.Context, cfg DBConfig) (*pgxpool.Pool, error)

NewPool is a test-fixture entry point that wraps the internal newPool.

Types

type DBConfig

type DBConfig struct {
	URL             string
	MaxConns        int32
	MinConns        int32
	MaxConnIdleTime string
	AutoMigrate     bool
}

DBConfig is the exported config type retained for test-fixture callers. Production code in the plugin uses the internal config{} directly via parseConfig(getenv). Tests can construct a DBConfig, convert to config, and call NewPool as a thin wrapper.

type Querier

type Querier interface {
	Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

Querier abstracts pgxpool.Pool and pgx.Tx so that store implementations work both outside and inside transactions. Both pgxpool.Pool and pgx.Tx satisfy this interface natively — no adapter needed.

type StoreFactory

type StoreFactory struct {
	// contains filtered or unexported fields
}

StoreFactory implements spi.StoreFactory backed by PostgreSQL.

func NewStoreFactory

func NewStoreFactory(pool *pgxpool.Pool) *StoreFactory

NewStoreFactory creates a new PostgreSQL-backed StoreFactory.

func (*StoreFactory) AsyncSearchStore

func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)

func (*StoreFactory) Close

func (f *StoreFactory) Close() error

func (*StoreFactory) EntityStore

func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)

func (*StoreFactory) KeyValueStore

func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)

func (*StoreFactory) MessageStore

func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)

func (*StoreFactory) ModelStore

func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)

func (*StoreFactory) Pool

func (f *StoreFactory) Pool() *pgxpool.Pool

Pool returns the underlying connection pool.

func (*StoreFactory) StateMachineAuditStore

func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)

func (*StoreFactory) TransactionManager

func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)

TransactionManager implements spi.StoreFactory. Returns the TM configured on the factory. Errors if none was set.

func (*StoreFactory) WorkflowStore

func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager implements spi.TransactionManager backed by PostgreSQL with SERIALIZABLE isolation. Each Begin() acquires a real pgx.Tx and registers it in the txRegistry so that stores can look it up by ID.

func NewTransactionManager

func NewTransactionManager(pool *pgxpool.Pool, uuids spi.UUIDGenerator) *TransactionManager

NewTransactionManager creates a new PostgreSQL-backed TransactionManager.

func (*TransactionManager) Begin

Begin starts a new SERIALIZABLE transaction and returns the transaction ID and a context carrying the TransactionState.

func (*TransactionManager) Commit

func (tm *TransactionManager) Commit(ctx context.Context, txID string) error

Commit commits the transaction and records its submit time. Returns spi.ErrConflict on serialization failure (PostgreSQL error 40001).

func (*TransactionManager) GetSubmitTime

func (tm *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)

GetSubmitTime returns the database timestamp recorded when the transaction was committed.

func (*TransactionManager) Join

func (tm *TransactionManager) Join(ctx context.Context, txID string) (context.Context, error)

Join attaches to an existing transaction, returning a context carrying its TransactionState.

func (*TransactionManager) LookupTx

func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)

LookupTx exposes the registry lookup for use in tests and by the store layer (resolveQuerier). Production code should prefer resolveQuerier.

func (*TransactionManager) ReleaseSavepoint

func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error

ReleaseSavepoint releases a savepoint, merging its work into the parent transaction.

func (*TransactionManager) Rollback

func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error

Rollback aborts the transaction.

func (*TransactionManager) RollbackToSavepoint

func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error

RollbackToSavepoint rolls back all work done since the named savepoint.

func (*TransactionManager) Savepoint

func (tm *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)

Savepoint creates a named savepoint within the given PostgreSQL transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL