Documentation
¶
Overview ¶
Package command is fabriq's only write path for KindAggregate entities.
Exec runs spec-driven validation, then — inside one tenant-stamped transaction — writes the aggregate row and appends exactly one versioned event to the transactional outbox. The Store/Tx ports are implemented by adapters/postgres (grove) in production and by fabriqtest fakes in unit tests; no engine types appear here.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Change ¶
type Change struct {
Entity *registry.Entity
Op Op // resolved write op: OpCreate / OpUpdate / OpDelete
Envelope event.Envelope
}
Change is the unit a LifecycleHook receives. Envelope is the canonical record — the exact event written to the outbox (tenant, agg id, version, type, after-image payload, commit time, traceparent, event id). Entity and Op are conveniences so the hook need not re-parse the type string.
type Command ¶
type Command struct {
// Entity is the registry name, e.g. "asset".
Entity string
// Op selects create/update/delete.
Op Op
// AggID identifies the aggregate. Required for update/delete/upsert;
// optional for create (a ULID is minted when empty).
AggID string
// Payload is the entity's grove model instance for create/update. The
// structural columns (id, tenant_id, version) are stamped by the
// executor — caller-provided values are ignored for id/version and
// rejected for a foreign tenant_id.
Payload any
// ExpectedVersion enables optimistic concurrency: when set, the stored
// version must match or the command fails with a VersionConflictError.
ExpectedVersion *int64
}
Command describes one write.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor implements Exec/ExecBatch over a Store.
func NewExecutor ¶
NewExecutor wires the command plane.
type ExecutorOption ¶
type ExecutorOption func(*Executor)
ExecutorOption customizes an Executor.
func WithClock ¶
func WithClock(now func() time.Time) ExecutorOption
WithClock overrides the envelope timestamp source (tests).
func WithHooks ¶
func WithHooks(hooks ...LifecycleHook) ExecutorOption
WithHooks appends lifecycle hooks to the executor's ordered chain. Each runs inside the write transaction after every change is staged; they fire in registration order and the first error aborts the command (and any batch).
func WithPostCommitHooks ¶
func WithPostCommitHooks(hooks ...PostCommitHook) ExecutorOption
WithPostCommitHooks appends hooks that run after the transaction commits successfully, receiving every Change produced. They never run on rollback.
func WithTraceparent ¶
func WithTraceparent(fn func(context.Context) string) ExecutorOption
WithTraceparent supplies the W3C traceparent extractor used to stamp envelopes; internal/otel provides the production implementation.
type LifecycleHook ¶
type LifecycleHook interface {
// OnChange runs INSIDE the write transaction, after the aggregate row and
// its outbox event are staged and before commit. Returning an error aborts
// the whole command (and any batch it belongs to): the write, the outbox
// event, and anything the hook wrote via tx all roll back together. Use tx
// to write additional rows atomically with the change.
OnChange(ctx context.Context, tx Tx, change Change) error
}
LifecycleHook observes — and may veto or augment — every committed change. It is the in-transaction, write-side, cross-cutting seam (distinct from the per-entity EntitySpec.Validate input check and the post-commit projection appliers). The canonical use is an auditing/chronicle extension that records every change atomically with the write. Mirrors the Applier/ApplierFunc pattern used by projections.
type Op ¶
type Op int
Op is the command operation.
const ( // OpCreate inserts a new aggregate (version 1). OpCreate Op = iota // OpUpdate replaces an existing aggregate's row (version+1). OpUpdate // OpDelete removes the row; the deletion is still a versioned event. OpDelete // OpUpsert inserts when the aggregate is absent (version 1, "created") // or updates when present (version+1, "updated"); idempotent by AggID. OpUpsert )
type PostCommitFunc ¶
PostCommitFunc adapts a function to PostCommitHook.
func (PostCommitFunc) AfterCommit ¶
func (f PostCommitFunc) AfterCommit(ctx context.Context, changes []Change)
AfterCommit implements PostCommitHook.
type PostCommitHook ¶
PostCommitHook runs AFTER a command (or batch) transaction commits successfully. It receives every Change the transaction produced. Unlike LifecycleHook it cannot veto or write transactionally — the data is already durable — so it returns nothing and must handle its own errors. The canonical use is cache invalidation: bump generations / evict entries once the write is known-committed (read-your-writes, no before-commit race).
type Store ¶
type Store interface {
InTenantTx(ctx context.Context, fn func(ctx context.Context, tx Tx) error) error
}
Store opens tenant-scoped units of work. Implementations must reject unstamped contexts and stamp the tenant into the transaction (SET LOCAL app.tenant_id for Postgres + RLS).
type Tx ¶
type Tx interface {
// CurrentVersion returns the aggregate's stored version, 0 if absent.
CurrentVersion(ctx context.Context, ent *registry.Entity, aggID string) (int64, error)
// ApplyChange writes the row: insert/update with the column-keyed
// values (already structurally stamped), or delete for OpDelete.
ApplyChange(ctx context.Context, ent *registry.Entity, op Op, aggID string, version int64, vals map[string]any) error
// AppendOutbox appends the event envelope to the transactional outbox.
AppendOutbox(ctx context.Context, env event.Envelope) error
// Exec runs a raw statement inside the command transaction (the tenant is
// already stamped via SET LOCAL app.tenant_id). It is the escape hatch a
// LifecycleHook uses to write its own rows atomically with the aggregate
// change. SQL here is control-plane-trusted — the same level as a migration
// or the relational raw Query escape hatch; the command path is Postgres-
// bound (the source of truth), so this introduces no engine type into core.
Exec(ctx context.Context, sql string, args ...any) error
}
Tx is the unit-of-work surface a store exposes to the executor. All methods run inside the same database transaction, already scoped to the calling tenant.