Documentation
¶
Overview ¶
Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time.
Exactly one envelope is appended to the transactional outbox per command; the relay publishes envelopes to Redis Streams; projections and the subscription hub consume them. The traceparent field carries the W3C trace context across the async hop so one trace spans command → outbox → relay → projection apply.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Envelope ¶
type Envelope struct {
// ID is a ULID: lexically sortable, globally unique, minted once at
// command time.
ID string `json:"id"`
// TenantID scopes the event; consumers must never re-derive it from
// the payload.
TenantID string `json:"tenant_id"`
// ScopeID is the optional secondary scope (sub-tenant partition). Empty when
// unscoped. Carried so projections can stamp scope_id on derived rows.
ScopeID string `json:"scopeId,omitempty"`
// Aggregate is the registry entity name, e.g. "asset".
Aggregate string `json:"aggregate"`
// AggID identifies the aggregate instance.
AggID string `json:"agg_id"`
// Version is the aggregate's monotonic version after this event.
Version int64 `json:"version"`
// Type is the derived event type, e.g. "asset.updated".
Type string `json:"type"`
// At is the commit-side timestamp.
At time.Time `json:"at"`
// PayloadSchemaVersion versions the payload shape; upcasters migrate
// old shapes forward at decode.
PayloadSchemaVersion int `json:"payload_schema_version"`
// Payload is the column-keyed JSON of the aggregate after the change
// (empty object for deletes).
Payload json.RawMessage `json:"payload"`
// Traceparent is the W3C traceparent active when the command executed.
Traceparent string `json:"traceparent,omitempty"`
}
Envelope is the wire shape of one domain event.
type Publisher ¶
type Publisher interface {
Publish(ctx context.Context, env Envelope, channels []string) (streamID string, err error)
}
Publisher fans one committed envelope out to the event stream and its derived change channels. Implemented by the Redis adapter; consumed by the outbox relay. Returns the event-stream entry ID (the relay stores it back onto the outbox row).
type Upcaster ¶
type Upcaster struct {
Type string
FromVersion int
Fn func(json.RawMessage) (json.RawMessage, error)
}
Upcaster migrates one event type's payload from FromVersion to FromVersion+1. Upcasters are pure functions; appliers only ever see the latest shape.
type UpcasterChain ¶
type UpcasterChain struct {
// contains filtered or unexported fields
}
UpcasterChain holds ordered upcasters per event type and applies them at decode until no upcaster matches the current version.
func NewUpcasterChain ¶
func NewUpcasterChain() *UpcasterChain
NewUpcasterChain returns an empty chain (Apply is then a passthrough).
func (*UpcasterChain) Apply ¶
func (c *UpcasterChain) Apply(env Envelope) (Envelope, error)
Apply walks the chain from the envelope's PayloadSchemaVersion upward, returning the envelope at the latest known shape.
func (*UpcasterChain) MustRegister ¶
func (c *UpcasterChain) MustRegister(u Upcaster)
MustRegister is Register that panics; for static wiring.
func (*UpcasterChain) Register ¶
func (c *UpcasterChain) Register(u Upcaster) error
Register adds an upcaster; one per (type, fromVersion).