event

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package event defines fabriq's versioned event envelope, its codec, and the upcaster chain that migrates old payload schemas at decode time.

Exactly one envelope is appended to the transactional outbox per command; the relay publishes envelopes to Redis Streams; projections and the subscription hub consume them. The traceparent field carries the W3C trace context across the async hop so one trace spans command → outbox → relay → projection apply.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Encode

func Encode(env Envelope) ([]byte, error)

Encode serializes an envelope after validating it.

func NewID

func NewID() string

NewID mints a ULID string.

Types

type Envelope

type Envelope struct {
	// ID is a ULID: lexically sortable, globally unique, minted once at
	// command time.
	ID string `json:"id"`

	// TenantID scopes the event; consumers must never re-derive it from
	// the payload.
	TenantID string `json:"tenant_id"`

	// ScopeID is the optional secondary scope (sub-tenant partition). Empty when
	// unscoped. Carried so projections can stamp scope_id on derived rows.
	ScopeID string `json:"scopeId,omitempty"`

	// Aggregate is the registry entity name, e.g. "asset".
	Aggregate string `json:"aggregate"`

	// AggID identifies the aggregate instance.
	AggID string `json:"agg_id"`

	// Version is the aggregate's monotonic version after this event.
	Version int64 `json:"version"`

	// Type is the derived event type, e.g. "asset.updated".
	Type string `json:"type"`

	// At is the commit-side timestamp.
	At time.Time `json:"at"`

	// PayloadSchemaVersion versions the payload shape; upcasters migrate
	// old shapes forward at decode.
	PayloadSchemaVersion int `json:"payload_schema_version"`

	// Payload is the column-keyed JSON of the aggregate after the change
	// (empty object for deletes).
	Payload json.RawMessage `json:"payload"`

	// Traceparent is the W3C traceparent active when the command executed.
	Traceparent string `json:"traceparent,omitempty"`
}

Envelope is the wire shape of one domain event.

func Decode

func Decode(raw []byte) (Envelope, error)

Decode parses and validates an envelope. Callers that own an upcaster chain should pass the result through UpcasterChain.Apply before handing the payload to appliers.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, env Envelope, channels []string) (streamID string, err error)
}

Publisher fans one committed envelope out to the event stream and its derived change channels. Implemented by the Redis adapter; consumed by the outbox relay. Returns the event-stream entry ID (the relay stores it back onto the outbox row).

type Upcaster

type Upcaster struct {
	Type        string
	FromVersion int
	Fn          func(json.RawMessage) (json.RawMessage, error)
}

Upcaster migrates one event type's payload from FromVersion to FromVersion+1. Upcasters are pure functions; appliers only ever see the latest shape.

type UpcasterChain

type UpcasterChain struct {
	// contains filtered or unexported fields
}

UpcasterChain holds ordered upcasters per event type and applies them at decode until no upcaster matches the current version.

func NewUpcasterChain

func NewUpcasterChain() *UpcasterChain

NewUpcasterChain returns an empty chain (Apply is then a passthrough).

func (*UpcasterChain) Apply

func (c *UpcasterChain) Apply(env Envelope) (Envelope, error)

Apply walks the chain from the envelope's PayloadSchemaVersion upward, returning the envelope at the latest known shape.

func (*UpcasterChain) MustRegister

func (c *UpcasterChain) MustRegister(u Upcaster)

MustRegister is Register that panics; for static wiring.

func (*UpcasterChain) Register

func (c *UpcasterChain) Register(u Upcaster) error

Register adds an upcaster; one per (type, fromVersion).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL