Documentation
¶
Overview ¶
Package postgres is the durable PostgreSQL storage plugin for cyoda-go.
It ships in the stock binary alongside the memory plugin and serves as the reference example for the DescribablePlugin pattern (ConfigVars() drives --help output) and for the txID-to-physical-handle bridge (pgx.Tx lookup via the internal txRegistry).
Configuration ¶
Plugin-namespaced env vars, all read via the injected getenv:
CYODA_POSTGRES_URL (required) PostgreSQL connection string CYODA_POSTGRES_MAX_CONNS default 25 CYODA_POSTGRES_MIN_CONNS default 5 CYODA_POSTGRES_MAX_CONN_IDLE_TIME default 5m CYODA_POSTGRES_AUTO_MIGRATE default true (runs embedded SQL migrations at startup)
Migrations and context cancellation ¶
NewFactory receives a startup context with a deadline. The embedded SQL migrations run via golang-migrate/migrate/v4, whose m.Up() method does not accept a context. To honor the deadline, runMigrations runs m.Up() in a goroutine and signals m.GracefulStop on ctx.Done() to interrupt at the next migration-step boundary.
TransactionManager and RLS ¶
The plugin's TM is a lifecycle tracker over a thread-safe txRegistry mapping txID → pgx.Tx. TM.Begin starts a SERIALIZABLE transaction, runs SELECT set_config('app.current_tenant', $1, true) for row-level security (the set_config function accepts bound parameters where SET LOCAL does not under pgx's extended-query protocol), and records the handle in the registry. Stores hold a ctxQuerier that re-resolves the underlying pgx.Tx on every call from the passed-in context — so the active tx, discovered via spi.GetTransaction(ctx), is always used when one is present, and the pool is used otherwise.
Registration:
import _ "github.com/cyoda-platform/cyoda-go/plugins/postgres"
Index ¶
- func Migrate(pool *pgxpool.Pool) error
- func MigrateDown(pool *pgxpool.Pool) error
- func NewPool(ctx context.Context, cfg DBConfig) (*pgxpool.Pool, error)
- type DBConfig
- type Querier
- type StoreFactory
- func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
- func (f *StoreFactory) Close() error
- func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
- func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
- func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
- func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
- func (f *StoreFactory) Pool() *pgxpool.Pool
- func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
- func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
- func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
- type TransactionManager
- func (tm *TransactionManager) Begin(ctx context.Context) (string, context.Context, error)
- func (tm *TransactionManager) Commit(ctx context.Context, txID string) error
- func (tm *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)
- func (tm *TransactionManager) Join(ctx context.Context, txID string) (context.Context, error)
- func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)
- func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
- func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error
- func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
- func (tm *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MigrateDown ¶
MigrateDown is preserved for test cleanup.
Types ¶
type DBConfig ¶
type DBConfig struct {
URL string
MaxConns int32
MinConns int32
MaxConnIdleTime string
AutoMigrate bool
}
DBConfig is the exported config type retained for test-fixture callers. Production code in the plugin uses the internal config{} directly via parseConfig(getenv). Tests can construct a DBConfig, convert to config, and call NewPool as a thin wrapper.
type Querier ¶
type Querier interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
Querier abstracts pgxpool.Pool and pgx.Tx so that store implementations work both outside and inside transactions. Both pgxpool.Pool and pgx.Tx satisfy this interface natively — no adapter needed.
type StoreFactory ¶
type StoreFactory struct {
// contains filtered or unexported fields
}
StoreFactory implements spi.StoreFactory backed by PostgreSQL.
func NewStoreFactory ¶
func NewStoreFactory(pool *pgxpool.Pool) *StoreFactory
NewStoreFactory creates a new PostgreSQL-backed StoreFactory.
func (*StoreFactory) AsyncSearchStore ¶
func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
func (*StoreFactory) Close ¶
func (f *StoreFactory) Close() error
func (*StoreFactory) EntityStore ¶
func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
func (*StoreFactory) KeyValueStore ¶
func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
func (*StoreFactory) MessageStore ¶
func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
func (*StoreFactory) ModelStore ¶
func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
func (*StoreFactory) Pool ¶
func (f *StoreFactory) Pool() *pgxpool.Pool
Pool returns the underlying connection pool.
func (*StoreFactory) StateMachineAuditStore ¶
func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
func (*StoreFactory) TransactionManager ¶
func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
TransactionManager implements spi.StoreFactory. Returns the TM configured on the factory. Errors if none was set.
func (*StoreFactory) WorkflowStore ¶
func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager implements spi.TransactionManager backed by PostgreSQL with SERIALIZABLE isolation. Each Begin() acquires a real pgx.Tx and registers it in the txRegistry so that stores can look it up by ID.
func NewTransactionManager ¶
func NewTransactionManager(pool *pgxpool.Pool, uuids spi.UUIDGenerator) *TransactionManager
NewTransactionManager creates a new PostgreSQL-backed TransactionManager.
func (*TransactionManager) Begin ¶
Begin starts a new SERIALIZABLE transaction and returns the transaction ID and a context carrying the TransactionState.
func (*TransactionManager) Commit ¶
func (tm *TransactionManager) Commit(ctx context.Context, txID string) error
Commit commits the transaction and records its submit time. Returns spi.ErrConflict on serialization failure (PostgreSQL error 40001).
func (*TransactionManager) GetSubmitTime ¶
GetSubmitTime returns the database timestamp recorded when the transaction was committed.
func (*TransactionManager) Join ¶
Join attaches to an existing transaction, returning a context carrying its TransactionState.
func (*TransactionManager) LookupTx ¶
func (tm *TransactionManager) LookupTx(txID string) (pgx.Tx, bool)
LookupTx exposes the registry lookup for use in tests and by the store layer (resolveQuerier). Production code should prefer resolveQuerier.
func (*TransactionManager) ReleaseSavepoint ¶
func (tm *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
ReleaseSavepoint releases a savepoint, merging its work into the parent transaction.
func (*TransactionManager) Rollback ¶
func (tm *TransactionManager) Rollback(ctx context.Context, txID string) error
Rollback aborts the transaction.
func (*TransactionManager) RollbackToSavepoint ¶
func (tm *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
RollbackToSavepoint rolls back all work done since the named savepoint.