Documentation
¶
Overview ¶
Package provisioning implements a durable, crash-recoverable state machine for dynamic per-tenant provisioning under the multi-tenant migration model defined in issue #379. Each transition is persisted via a StateStore before the next step runs, so a worker crash mid-flow can be resumed by reloading the persisted state and continuing from there.
PostgreSQL is the only vendor with a default StateStore in v1; the Store interface is vendor-pluggable so Oracle (#385) and other backends can be added without changing the executor or step callbacks.
Reuse decision: this package mirrors the *patterns* of the outbox/ package (status enum on a row, retry counter, vendor-pluggable Store, bundled DDL with idempotent auto-create, in-memory mock for tests) but diverges on the data model. Outbox is a fire-and-forget event queue with idempotency-token deduplication; provisioning needs finite-state graph semantics with blocking transitions and per-tenant scope. See ADR-021 for the full rationale.
Index ¶
- Constants
- Variables
- func ValidateTransition(from, to State) error
- type Executor
- type Job
- type MemoryStore
- func (*MemoryStore) CreateTable(_ context.Context) error
- func (m *MemoryStore) Get(_ context.Context, jobID string) (*Job, error)
- func (m *MemoryStore) Transition(_ context.Context, jobID string, from, to State, metadata map[string]string, ...) error
- func (m *MemoryStore) Upsert(_ context.Context, job *Job) (*Job, error)
- func (m *MemoryStore) WithClock(now func() time.Time) *MemoryStore
- type PostgresStore
- func (s *PostgresStore) CreateTable(ctx context.Context) error
- func (s *PostgresStore) Get(ctx context.Context, jobID string) (*Job, error)
- func (s *PostgresStore) Transition(ctx context.Context, jobID string, from, to State, metadata map[string]string, ...) error
- func (s *PostgresStore) Upsert(ctx context.Context, job *Job) (*Job, error)
- func (s *PostgresStore) WithClock(now func() time.Time) *PostgresStore
- type State
- type StateStore
- type Steps
Constants ¶
const DefaultPostgresTable = "provisioning_jobs"
DefaultPostgresTable is the table name used when NewPostgresStore is called with an empty tableName argument. Operators with strict naming conventions can override at construction time.
const PostgresStateTableDDL = `` /* 437-byte string literal not displayed */
PostgresStateTableDDL is the CREATE TABLE statement used by CreateTable. Exported so operators managing schema externally can run it via their own migration tooling. The %s placeholder is replaced with the validated (and quoted) table identifier; double-call the helper with the same table name to apply.
metadata is JSONB so consumers can record step-specific state (applied migration version range, seeded fixture IDs, etc.) without schema churn.
Variables ¶
var ( // ErrJobNotFound is returned by Get when no record exists for the // supplied jobID. ErrJobNotFound = errors.New("provisioning: job not found") // ErrStaleRead is returned by Transition when the persisted state // doesn't match the caller's expected `from`. Indicates a concurrent // writer changed the job between the caller's read and transition. ErrStaleRead = errors.New("provisioning: stale read; concurrent transition detected") // ErrInvalidJob is returned by Upsert when the supplied Job is nil // or has an empty ID/TenantID. Wrapped with the failing field name. ErrInvalidJob = errors.New("provisioning: invalid job") )
Sentinel errors returned by StateStore implementations and the package's validation helpers. Use errors.Is to test.
var ErrIllegalTransition = errors.New("provisioning: illegal state transition")
ErrIllegalTransition is returned by ValidateTransition and Executor when the caller attempts an edge not in the state graph.
var ErrInvalidTableName = errors.New("provisioning: invalid PostgreSQL table name")
ErrInvalidTableName is returned by NewPostgresStore when the supplied table name fails the safe-identifier check.
var PostgresStateTableIndexes = []string{
`CREATE INDEX IF NOT EXISTS idx_%[2]s_tenant ON %[1]s (tenant_id)`,
`CREATE INDEX IF NOT EXISTS idx_%[2]s_state ON %[1]s (state)`,
}
PostgresStateTableIndexes are the supporting indexes for tenant- and state-scoped lookups. The dispatcher (out of scope for #379) will scan rows by (state) to find work; ad-hoc operator queries usually filter by tenant. Both are idempotent.
Functions ¶
func ValidateTransition ¶
ValidateTransition reports whether moving from -> to is permitted by the state graph. Returns nil when the transition is valid, ErrIllegalTransition (wrapped with the offending edge) otherwise.
Types ¶
type Executor ¶
type Executor struct {
Store StateStore
Steps Steps
Logger logger.Logger
}
Executor advances a persisted Job through the state machine, invoking the consumer-supplied Steps at each forward transition. Run is safe to call against a partially-completed job — the Executor loads the persisted state and resumes from there.
Crash recovery contract: each step MUST be idempotent. If the worker crashes after a step's side effect succeeds but before the Transition is persisted, the next Run call re-invokes the same step. Schema and role creation are idempotent by construction (see migration/roles.go); Migrate is idempotent because Flyway tracks applied versions in flyway_schema_history; Seed is the consumer's responsibility to design idempotently (typical pattern: INSERT ... ON CONFLICT DO NOTHING).
func NewExecutor ¶
NewExecutor constructs an Executor and validates its dependencies. Returns an error if store is nil, logger is nil, or any required step is nil.
func (*Executor) Run ¶
Run advances jobID from its persisted state to StateReady (or StateFailed if any forward step errors). Safe to call repeatedly with the same jobID: if the job is already at a terminal state, Run is a no-op and returns nil for StateReady or the last persisted LastError wrapped in a sentinel for StateFailed.
The Executor calls each forward step at most once per Run for a given state; ctx is propagated to each step so callers can use context cancellation to bound total runtime.
type Job ¶
type Job struct {
ID string // Idempotency key — re-running with the same ID converges on the same end state.
TenantID string // Logical tenant identifier the job provisions.
State State // Current state. Updated by every Transition.
Attempts int // Number of forward-step attempts on the current state, used for retry bookkeeping.
LastError string // Most recent step error, surfaced in audit events. Empty on success paths.
Metadata map[string]string // Step-specific data that must survive a crash. Consumer-owned shape; see SECURITY note above.
CreatedAt time.Time // First persistence timestamp; set by Upsert.
UpdatedAt time.Time // Most recent persistence timestamp; updated on every Transition.
}
Job is the persisted record of a single per-tenant provisioning attempt. All fields except Metadata are owned by the StateStore; Metadata is opaque to the framework and gives consumers a place to record step-specific data that must survive a crash (e.g., the applied Flyway version range).
SECURITY: Metadata is persisted in plaintext (JSONB on PostgreSQL). Do not store credentials, secret tokens, or other sensitive material in it — use a secret manager and record only opaque references (e.g. the secret name) when continuity across crashes is needed.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-process StateStore implementation backed by a map. Used by unit tests and as a reference implementation for vendor authors adding new backends; not intended for production. All operations are goroutine-safe; transitions are serialized by a per-store mutex (not per-job) which is fine for test workloads.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns an empty MemoryStore ready for use.
func (*MemoryStore) CreateTable ¶
func (*MemoryStore) CreateTable(_ context.Context) error
CreateTable is a no-op for MemoryStore — the in-memory map needs no schema. Implemented to satisfy the StateStore interface so MemoryStore is a drop-in substitute for backend stores in unit tests.
func (*MemoryStore) Get ¶
Get returns a deep copy of the persisted job to prevent callers from mutating the store's internal state by holding the returned pointer.
func (*MemoryStore) Transition ¶
func (m *MemoryStore) Transition( _ context.Context, jobID string, from, to State, metadata map[string]string, lastError string, ) error
Transition advances jobID from -> to with optimistic-concurrency check against the persisted state. Returns ErrJobNotFound, ErrIllegalTransition, or ErrStaleRead per the StateStore contract.
func (*MemoryStore) Upsert ¶
Upsert inserts job at StatePending if no record exists, or returns the existing record unchanged. The CreatedAt and UpdatedAt fields on the returned record are populated by the store. Rejects nil jobs and jobs missing ID or TenantID via ErrInvalidJob.
func (*MemoryStore) WithClock ¶
func (m *MemoryStore) WithClock(now func() time.Time) *MemoryStore
WithClock returns m with the supplied clock function installed. Useful for deterministic timestamps in tests. Passing nil restores time.Now.
type PostgresStore ¶
type PostgresStore struct {
// contains filtered or unexported fields
}
PostgresStore is the PostgreSQL-backed reference implementation of StateStore. Construct via NewPostgresStore so the table name is validated.
func NewPostgresStore ¶
func NewPostgresStore(db *sql.DB, tableName string) (*PostgresStore, error)
NewPostgresStore returns a PostgresStore backed by db with the supplied table name. Empty tableName resolves to DefaultPostgresTable. Returns ErrInvalidTableName if the table name fails the safe-identifier check.
func (*PostgresStore) CreateTable ¶
func (s *PostgresStore) CreateTable(ctx context.Context) error
CreateTable creates the provisioning table and its supporting indexes idempotently. Safe to call on a database where the table already exists.
SQL composition note: PostgresStateTableDDL and PostgresStateTableIndexes are package-internal string constants; the only %s substitution is s.quotedTable (and s.indexBase for the index template), both of which are regex-validated identifiers (safePGTableIdent) captured at construction. No user-controlled value reaches the formatted SQL — see the per-line NOSONAR suppressions below.
func (*PostgresStore) Transition ¶
func (s *PostgresStore) Transition( ctx context.Context, jobID string, from, to State, metadata map[string]string, lastError string, ) error
Transition applies the from -> to edge atomically. ValidateTransition gates the graph; the SQL UPDATE ... WHERE state = $from gates concurrent writers.
func (*PostgresStore) Upsert ¶
Upsert inserts job at StatePending when no row exists for job.ID, or returns the existing row unchanged. Matches the StateStore contract. Rejects nil jobs and jobs missing ID or TenantID via ErrInvalidJob.
func (*PostgresStore) WithClock ¶
func (s *PostgresStore) WithClock(now func() time.Time) *PostgresStore
WithClock returns s with the supplied clock function installed. Useful for deterministic timestamps in tests.
type State ¶
type State string
State enumerates the per-tenant provisioning states defined in issue #379. Transitions are validated by the package's static graph (see allowedNext); illegal transitions return ErrIllegalTransition and never silently succeed.
const ( // StatePending is the initial state when a job is first persisted. // The worker has not started any side-effecting work. StatePending State = "pending" // StateSchemaCreated marks the successful CREATE SCHEMA owned by the // migrator role (idempotent via CREATE SCHEMA IF NOT EXISTS). StateSchemaCreated State = "schema_created" // StateRoleCreated marks the successful creation of the per-tenant // runtime role with the locked-down privilege model from #378. StateRoleCreated State = "role_created" // StateMigrated marks the successful application of all pending Flyway // migrations against the tenant schema. StateMigrated State = "migrated" // StateSeeded marks the successful completion of reference-data seeding // and any external-system registrations the consumer wires in. StateSeeded State = "seeded" // StateReady is the terminal success state. The tenant is fully usable // by the runtime service. No further transitions are allowed. StateReady State = "ready" // StateCleanup is the rollback state entered when any forward step // returns an error. The cleanup step drops partially-provisioned // resources (schema, role). StateCleanup State = "cleanup" // StateFailed is the terminal failure state. Retry creates a new job // ID rather than reusing the existing one — this keeps the failure // trace intact for audit purposes. StateFailed State = "failed" )
func (State) IsTerminal ¶
IsTerminal reports whether s is a terminal state (no further transitions).
type StateStore ¶
type StateStore interface {
// Get loads the job by ID. Returns ErrJobNotFound when no record exists.
Get(ctx context.Context, jobID string) (*Job, error)
// Upsert writes a new job at StatePending if no record exists for jobID,
// or returns the existing record unchanged if one does. This is the
// idempotency contract referenced in #379: re-running with the same
// jobID converges on the same end state without duplicate side effects,
// because the executor will see the persisted state and resume (or
// no-op if terminal).
//
// Implementations must populate Job.CreatedAt and Job.UpdatedAt on the
// returned record. State, Attempts, LastError, and Metadata on the
// passed-in job are used only when the record is newly inserted.
Upsert(ctx context.Context, job *Job) (*Job, error)
// Transition advances jobID's State from -> to atomically. Optimistic
// concurrency: implementations MUST verify the row's current state is
// `from` before applying the update; if it isn't, return ErrStaleRead
// without mutating the row.
//
// metadata replaces the persisted Metadata in full (not merged) so
// callers controlling the in-memory job can decide whether to carry
// step-specific fields forward. lastError replaces the persisted
// LastError (empty string clears a prior error).
//
// The transition is also validated against the static state graph;
// illegal edges return ErrIllegalTransition.
Transition(ctx context.Context, jobID string, from, to State, metadata map[string]string, lastError string) error
// CreateTable creates the persistence table(s) and any required indexes,
// idempotently. Safe to call on a database where the table already
// exists. Operators who prefer to manage schema externally can skip
// this and run the equivalent DDL via their migration tooling — see
// the implementation's exported DDL constants.
CreateTable(ctx context.Context) error
}
StateStore persists Job records and enforces optimistic-concurrency transitions. Implementations are typically backed by the same database as the tenant's business data; the framework ships a PostgreSQL reference implementation at NewPostgresStore.
Per ADR-021, this interface mirrors the *patterns* of outbox.Store (idempotent CreateTable, vendor-pluggable, in-memory mock for tests) but the method shape is finite-state-machine-oriented rather than queue-oriented: there is no FetchPending / MarkPublished pair — the dispatcher feeding the executor is out of scope for #379.
type Steps ¶
type Steps struct {
CreateSchema func(ctx context.Context, job *Job) error
CreateRole func(ctx context.Context, job *Job) error
Migrate func(ctx context.Context, job *Job) error
Seed func(ctx context.Context, job *Job) error
Cleanup func(ctx context.Context, job *Job) error
}
Steps groups the consumer-supplied callbacks invoked by the Executor at each forward transition. The CreateSchema step runs to leave StatePending, the CreateRole step runs to leave StateSchemaCreated, and so on. Cleanup runs when any forward step errors; the executor moves the job to StateCleanup, invokes Cleanup, then transitions to StateFailed regardless of the cleanup outcome (cleanup errors are logged but do not block the failure transition).
Each step receives the current Job and may update Job.Metadata in place; the updated Metadata is persisted by the Executor as part of the subsequent Transition call.