command

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package command is fabriq's only write path for KindAggregate entities.

Exec runs spec-driven validation, then — inside one tenant-stamped transaction — writes the aggregate row and appends exactly one versioned event to the transactional outbox. The Store/Tx ports are implemented by adapters/postgres (grove) in production and by fabriqtest fakes in unit tests; no engine types appear here.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Change

type Change struct {
	Entity   *registry.Entity
	Op       Op // resolved write op: OpCreate / OpUpdate / OpDelete
	Envelope event.Envelope
}

Change is the unit a LifecycleHook receives. Envelope is the canonical record — the exact event written to the outbox (tenant, agg id, version, type, after-image payload, commit time, traceparent, event id). Entity and Op are conveniences so the hook need not re-parse the type string.

type Command

type Command struct {
	// Entity is the registry name, e.g. "asset".
	Entity string

	// Op selects create/update/delete.
	Op Op

	// AggID identifies the aggregate. Required for update/delete/upsert;
	// optional for create (a ULID is minted when empty).
	AggID string

	// Payload is the entity's grove model instance for create/update. The
	// structural columns (id, tenant_id, version) are stamped by the
	// executor — caller-provided values are ignored for id/version and
	// rejected for a foreign tenant_id.
	Payload any

	// ExpectedVersion enables optimistic concurrency: when set, the stored
	// version must match or the command fails with a VersionConflictError.
	ExpectedVersion *int64
}

Command describes one write.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor implements Exec/ExecBatch over a Store.

func NewExecutor

func NewExecutor(reg *registry.Registry, store Store, opts ...ExecutorOption) (*Executor, error)

NewExecutor wires the command plane.

func (*Executor) Exec

func (x *Executor) Exec(ctx context.Context, cmd Command) (Result, error)

Exec runs one command in its own transaction.

func (*Executor) ExecBatch

func (x *Executor) ExecBatch(ctx context.Context, cmds []Command) ([]Result, error)

ExecBatch runs N commands in ONE transaction: ordered, all-or-nothing.

type ExecutorOption

type ExecutorOption func(*Executor)

ExecutorOption customizes an Executor.

func WithClock

func WithClock(now func() time.Time) ExecutorOption

WithClock overrides the envelope timestamp source (tests).

func WithHooks

func WithHooks(hooks ...LifecycleHook) ExecutorOption

WithHooks appends lifecycle hooks to the executor's ordered chain. Each runs inside the write transaction after every change is staged; they fire in registration order and the first error aborts the command (and any batch).

func WithPostCommitHooks

func WithPostCommitHooks(hooks ...PostCommitHook) ExecutorOption

WithPostCommitHooks appends hooks that run after the transaction commits successfully, receiving every Change produced. They never run on rollback.

func WithTraceparent

func WithTraceparent(fn func(context.Context) string) ExecutorOption

WithTraceparent supplies the W3C traceparent extractor used to stamp envelopes; internal/otel provides the production implementation.

type HookFunc

type HookFunc func(ctx context.Context, tx Tx, change Change) error

HookFunc adapts a function to LifecycleHook.

func (HookFunc) OnChange

func (f HookFunc) OnChange(ctx context.Context, tx Tx, change Change) error

OnChange implements LifecycleHook.

type LifecycleHook

type LifecycleHook interface {
	// OnChange runs INSIDE the write transaction, after the aggregate row and
	// its outbox event are staged and before commit. Returning an error aborts
	// the whole command (and any batch it belongs to): the write, the outbox
	// event, and anything the hook wrote via tx all roll back together. Use tx
	// to write additional rows atomically with the change.
	OnChange(ctx context.Context, tx Tx, change Change) error
}

LifecycleHook observes — and may veto or augment — every committed change. It is the in-transaction, write-side, cross-cutting seam (distinct from the per-entity EntitySpec.Validate input check and the post-commit projection appliers). The canonical use is an auditing/chronicle extension that records every change atomically with the write. Mirrors the Applier/ApplierFunc pattern used by projections.

type Op

type Op int

Op is the command operation.

const (
	// OpCreate inserts a new aggregate (version 1).
	OpCreate Op = iota
	// OpUpdate replaces an existing aggregate's row (version+1).
	OpUpdate
	// OpDelete removes the row; the deletion is still a versioned event.
	OpDelete
	// OpUpsert inserts when the aggregate is absent (version 1, "created")
	// or updates when present (version+1, "updated"); idempotent by AggID.
	OpUpsert
)

func (Op) Verb

func (o Op) Verb() string

Verb returns the event verb for the operation.

type PostCommitFunc

type PostCommitFunc func(ctx context.Context, changes []Change)

PostCommitFunc adapts a function to PostCommitHook.

func (PostCommitFunc) AfterCommit

func (f PostCommitFunc) AfterCommit(ctx context.Context, changes []Change)

AfterCommit implements PostCommitHook.

type PostCommitHook

type PostCommitHook interface {
	AfterCommit(ctx context.Context, changes []Change)
}

PostCommitHook runs AFTER a command (or batch) transaction commits successfully. It receives every Change the transaction produced. Unlike LifecycleHook it cannot veto or write transactionally — the data is already durable — so it returns nothing and must handle its own errors. The canonical use is cache invalidation: bump generations / evict entries once the write is known-committed (read-your-writes, no before-commit race).

type Result

type Result struct {
	AggID   string
	Version int64
	EventID string
}

Result reports the outcome of one command.

type Store

type Store interface {
	InTenantTx(ctx context.Context, fn func(ctx context.Context, tx Tx) error) error
}

Store opens tenant-scoped units of work. Implementations must reject unstamped contexts and stamp the tenant into the transaction (SET LOCAL app.tenant_id for Postgres + RLS).

type Tx

type Tx interface {
	// CurrentVersion returns the aggregate's stored version, 0 if absent.
	CurrentVersion(ctx context.Context, ent *registry.Entity, aggID string) (int64, error)

	// ApplyChange writes the row: insert/update with the column-keyed
	// values (already structurally stamped), or delete for OpDelete.
	ApplyChange(ctx context.Context, ent *registry.Entity, op Op, aggID string, version int64, vals map[string]any) error

	// AppendOutbox appends the event envelope to the transactional outbox.
	AppendOutbox(ctx context.Context, env event.Envelope) error

	// Exec runs a raw statement inside the command transaction (the tenant is
	// already stamped via SET LOCAL app.tenant_id). It is the escape hatch a
	// LifecycleHook uses to write its own rows atomically with the aggregate
	// change. SQL here is control-plane-trusted — the same level as a migration
	// or the relational raw Query escape hatch; the command path is Postgres-
	// bound (the source of truth), so this introduces no engine type into core.
	Exec(ctx context.Context, sql string, args ...any) error
}

Tx is the unit-of-work surface a store exposes to the executor. All methods run inside the same database transaction, already scoped to the calling tenant.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL