Documentation
¶
Overview ¶
Package gala provides durable, typed eventing primitives intended to replace ad-hoc in-memory dispatch patterns with a River-native foundation it's a black tie affair for your events, ensuring they arrive in style and on time
Index ¶
- Constants
- Variables
- func HasFlag(ctx context.Context, flag ContextFlag) bool
- func RegisterTopic[T any](registry *Registry, registration Registration[T]) error
- func WithFlag(ctx context.Context, flag ContextFlag) context.Context
- type AuthSnapshot
- type Codec
- type Config
- type ContextCodec
- type ContextFlag
- type ContextKey
- type ContextManager
- type ContextSnapshot
- type Definition
- type Dispatcher
- type DurableContextCodec
- type DurableContextSnapshot
- type EmitReceipt
- type Envelope
- type EventID
- type Gala
- func (g *Gala) Close() error
- func (g *Gala) ContextManager() *ContextManager
- func (g *Gala) DispatchEnvelope(ctx context.Context, envelope Envelope) error
- func (g *Gala) EmitEnvelope(ctx context.Context, envelope Envelope) error
- func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any, headers Headers) EmitReceipt
- func (g *Gala) Injector() do.Injector
- func (g *Gala) Registry() *Registry
- func (g *Gala) StartWorkers(ctx context.Context) error
- func (g *Gala) StopWorkers(ctx context.Context) error
- type Handler
- type HandlerContext
- type Headers
- type JSONCodec
- type ListenerError
- type ListenerID
- type Provider
- type Registration
- type Registry
- type RiverDispatchArgs
- type RiverDispatchWorker
- type RiverDispatcher
- type RiverInsertClient
- type Topic
- type TopicName
- type TypedContextCodec
Constants ¶
const DefaultQueueName = "events"
DefaultQueueName is the default queue used for gala durable dispatch jobs
const RiverDispatchJobKind = "gala_dispatch_v1"
RiverDispatchJobKind is the River job kind used for durable gala dispatch
Variables ¶
var ( // ErrGalaRequired is returned when a nil gala runtime is used ErrGalaRequired = errors.New("gala: gala is required") // ErrRegistryRequired is returned when a nil topic registry is used ErrRegistryRequired = errors.New("gala: registry is required") // ErrTopicNameRequired is returned when a topic name is empty ErrTopicNameRequired = errors.New("gala: topic name is required") // ErrTopicAlreadyRegistered is returned when a topic is registered more than once ErrTopicAlreadyRegistered = errors.New("gala: topic already registered") // ErrTopicNotRegistered is returned when topic metadata cannot be found ErrTopicNotRegistered = errors.New("gala: topic not registered") // ErrCodecRequired is returned when a registration is missing a codec ErrCodecRequired = errors.New("gala: codec is required") // ErrListenerNameRequired is returned when a listener name is empty ErrListenerNameRequired = errors.New("gala: listener name is required") // ErrListenerHandlerRequired is returned when a listener callback is missing ErrListenerHandlerRequired = errors.New("gala: listener handler is required") // ErrListenerTopicNotRegistered is returned when a listener is attached before topic registration ErrListenerTopicNotRegistered = errors.New("gala: listener topic not registered") // ErrPayloadTypeMismatch is returned when payload casting fails for a topic or listener ErrPayloadTypeMismatch = errors.New("gala: payload type mismatch") // ErrPayloadEncodeFailed is returned when payload serialization fails ErrPayloadEncodeFailed = errors.New("gala: payload encode failed") // ErrPayloadDecodeFailed is returned when payload deserialization fails ErrPayloadDecodeFailed = errors.New("gala: payload decode failed") // ErrEnvelopePayloadRequired is returned when an envelope has an empty payload ErrEnvelopePayloadRequired = errors.New("gala: envelope payload is required") // ErrDispatcherRequired is returned when emit is attempted without a dispatcher ErrDispatcherRequired = errors.New("gala: dispatcher is required") // ErrDispatchFailed is returned when dispatch fails ErrDispatchFailed = errors.New("gala: dispatch failed") // ErrContextCodecRequired is returned when context codec registration receives nil ErrContextCodecRequired = errors.New("gala: context codec is required") // ErrContextCodecKeyRequired is returned when a context codec key is empty ErrContextCodecKeyRequired = errors.New("gala: context codec key is required") // ErrContextCodecAlreadyRegistered is returned when a context codec key is duplicated ErrContextCodecAlreadyRegistered = errors.New("gala: context codec already registered") // ErrContextSnapshotCaptureFailed is returned when snapshot capture fails ErrContextSnapshotCaptureFailed = errors.New("gala: context snapshot capture failed") // ErrContextSnapshotRestoreFailed is returned when snapshot restore fails ErrContextSnapshotRestoreFailed = errors.New("gala: context snapshot restore failed") // ErrRiverJobClientRequired is returned when a river dispatcher is built without a job client ErrRiverJobClientRequired = errors.New("gala: river job client is required") // ErrRiverGalaProviderRequired is returned when a river worker is built without a gala provider ErrRiverGalaProviderRequired = errors.New("gala: river gala provider is required") // ErrRiverDispatchJobEnvelopeRequired is returned when a river dispatch job has no envelope payload ErrRiverDispatchJobEnvelopeRequired = errors.New("gala: river dispatch job envelope is required") // ErrRiverEnvelopeEncodeFailed is returned when encoding a river envelope payload fails ErrRiverEnvelopeEncodeFailed = errors.New("gala: river envelope encode failed") // ErrRiverEnvelopeDecodeFailed is returned when decoding a river envelope payload fails ErrRiverEnvelopeDecodeFailed = errors.New("gala: river envelope decode failed") // ErrRiverDispatchInsertFailed is returned when inserting a durable river dispatch job fails ErrRiverDispatchInsertFailed = errors.New("gala: river dispatch insert failed") // ErrRiverConnectionURIRequired is returned when river runtime setup is missing a connection URI ErrRiverConnectionURIRequired = errors.New("gala: river connection URI is required") // ErrRiverClientInitializationFailed is returned when building the river queue client fails ErrRiverClientInitializationFailed = errors.New("gala: river client initialization failed") // ErrRiverWorkerStartFailed is returned when starting gala river workers fails ErrRiverWorkerStartFailed = errors.New("gala: river worker start failed") // ErrRiverWorkerStopFailed is returned when stopping gala river workers fails ErrRiverWorkerStopFailed = errors.New("gala: river worker stop failed") // ErrRiverClientCloseFailed is returned when closing the gala river queue client fails ErrRiverClientCloseFailed = errors.New("gala: river client close failed") // ErrAuthContextEncodeFailed is returned when auth context snapshot encoding fails ErrAuthContextEncodeFailed = errors.New("gala: auth context encode failed") // ErrAuthContextDecodeFailed is returned when auth context snapshot decoding fails ErrAuthContextDecodeFailed = errors.New("gala: auth context decode failed") // ErrListenerPanicked is returned when a listener panics during execution ErrListenerPanicked = errors.New("gala: listener panicked") )
Functions ¶
func HasFlag ¶
func HasFlag(ctx context.Context, flag ContextFlag) bool
HasFlag reports whether a typed context flag is set
func RegisterTopic ¶
func RegisterTopic[T any](registry *Registry, registration Registration[T]) error
RegisterTopic registers one typed topic in the registry
Types ¶
type AuthSnapshot ¶
type AuthSnapshot struct {
// SubjectID is the authenticated principal identifier
SubjectID string `json:"subject_id,omitempty"`
// SubjectName is the authenticated principal display name
SubjectName string `json:"subject_name,omitempty"`
// SubjectEmail is the authenticated principal email
SubjectEmail string `json:"subject_email,omitempty"`
// OrganizationID is the active organization scope
OrganizationID string `json:"organization_id,omitempty"`
// OrganizationName is the active organization display name
OrganizationName string `json:"organization_name,omitempty"`
// OrganizationIDs contains organizations available in caller scope
OrganizationIDs []string `json:"organization_ids,omitempty"`
// AuthenticationType identifies the authentication method used by the caller
AuthenticationType string `json:"authentication_type,omitempty"`
// OrganizationRole captures the caller role within the active organization
OrganizationRole string `json:"organization_role,omitempty"`
// IsSystemAdmin reports whether the caller has system-admin privileges
IsSystemAdmin bool `json:"is_system_admin,omitempty"`
}
AuthSnapshot is a JSON-safe snapshot of authenticated user context values
func (AuthSnapshot) ToAuthenticatedUser ¶
func (s AuthSnapshot) ToAuthenticatedUser() *auth.AuthenticatedUser
ToAuthenticatedUser converts a snapshot into an auth.AuthenticatedUser payload
type Codec ¶
type Codec[T any] interface { // Encode serializes the typed payload Encode(T) ([]byte, error) // Decode deserializes payload bytes into the typed payload Decode([]byte) (T, error) }
Codec encodes and decodes a topic payload type
type Config ¶
type Config struct {
// Enabled toggles Gala worker startup and dispatch support when true
Enabled bool
// ConnectionURI is the database connection URI used for the dedicated gala river client
ConnectionURI string
// QueueName is the gala queue used for durable dispatch jobs
QueueName string
// WorkerCount is the max worker concurrency for the gala queue
WorkerCount int
// MaxRetries sets max attempts for gala dispatch jobs when greater than zero
MaxRetries int
// RunMigrations enables River schema migrations on startup (use for tests only)
RunMigrations bool
// FetchCooldown is the minimum time between job fetches per worker (default 100ms, min 1ms)
// Lower values increase throughput but also database load. River enforces 1ms minimum.
FetchCooldown time.Duration
// FetchPollInterval is the fallback polling interval when LISTEN/NOTIFY misses events (default 1s)
// This is only used when LISTEN/NOTIFY fails to deliver notifications.
FetchPollInterval time.Duration
}
Config configures cohesive Gala startup
type ContextCodec ¶
type ContextCodec interface {
// Key returns the stable snapshot key
Key() ContextKey
// Capture extracts and encodes context data
Capture(context.Context) (json.RawMessage, bool, error)
// Restore decodes and re-attaches context data
Restore(context.Context, json.RawMessage) (context.Context, error)
}
ContextCodec captures and restores one typed context value
type ContextFlag ¶
type ContextFlag string
ContextFlag identifies a boolean context flag
const ( // ContextFlagWorkflowBypass marks workflow bypass behavior ContextFlagWorkflowBypass ContextFlag = "workflow_bypass" // ContextFlagWorkflowAllowEventEmission allows workflow listener execution while bypass is set ContextFlagWorkflowAllowEventEmission ContextFlag = "workflow_allow_event_emission" )
type ContextKey ¶
type ContextKey = string
ContextKey identifies a restorable context value key using string alias for better readability and to avoid collisions with other context keys this has to be a string to be used as a JSON key for durability rather than a strict type + contextx
type ContextManager ¶
type ContextManager struct {
// contains filtered or unexported fields
}
ContextManager manages context codecs and snapshot round-trips
func NewContextManager ¶
func NewContextManager(codecs ...ContextCodec) (*ContextManager, error)
NewContextManager creates a context manager and registers any initial codecs
func (*ContextManager) Capture ¶
func (m *ContextManager) Capture(ctx context.Context) (ContextSnapshot, error)
Capture captures all registered context codec values and current context flags
func (*ContextManager) Register ¶
func (m *ContextManager) Register(codec ContextCodec) error
Register registers a context codec by key
func (*ContextManager) Restore ¶
func (m *ContextManager) Restore(ctx context.Context, snapshot ContextSnapshot) (context.Context, error)
Restore restores snapshot values into a new context
type ContextSnapshot ¶
type ContextSnapshot struct {
// Values contains codec-managed context values
Values map[ContextKey]json.RawMessage `json:"values,omitempty"`
// Flags contains boolean context flags
Flags map[ContextFlag]bool `json:"flags,omitempty"`
}
ContextSnapshot captures context data that can be restored after durable hops
type Definition ¶
type Definition[T any] struct { // Topic is the topic handled by this listener Topic Topic[T] // Name is the stable listener name Name string // Operations optionally scopes listener interest to specific mutation operations // Empty means the listener accepts all operations for the topic Operations []string // Handle is the callback invoked for this listener Handle Handler[T] }
Definition defines one listener binding
type Dispatcher ¶
type Dispatcher interface {
// Dispatch dispatches an envelope to the configured transport
Dispatch(context.Context, Envelope) error
}
Dispatcher dispatches envelopes to the configured transport
type DurableContextCodec ¶
type DurableContextCodec struct{}
DurableContextCodec captures and restores durable context values including auth and logger fields
func NewContextCodec ¶
func NewContextCodec() DurableContextCodec
NewContextCodec creates a context codec for durable context capture
func (DurableContextCodec) Capture ¶
func (DurableContextCodec) Capture(ctx context.Context) (json.RawMessage, bool, error)
Capture extracts durable context values and encodes them as JSON
func (DurableContextCodec) Key ¶
func (DurableContextCodec) Key() ContextKey
Key returns the stable snapshot key used by the context codec
func (DurableContextCodec) Restore ¶
func (DurableContextCodec) Restore(ctx context.Context, raw json.RawMessage) (context.Context, error)
Restore decodes durable context values and restores them on the supplied context
type DurableContextSnapshot ¶
type DurableContextSnapshot struct {
// Auth contains authenticated user context when present
Auth *AuthSnapshot `json:"auth,omitempty"`
// LogFields contains logger context fields for correlation and tracing
LogFields map[string]any `json:"log_fields,omitempty"`
}
DurableContextSnapshot captures context values that should persist across durable event processing
type EmitReceipt ¶
type EmitReceipt struct {
// EventID is the emitted event identifier
EventID EventID
// Accepted reports whether the event was accepted for processing
Accepted bool
// Err contains any terminal emit error
Err error
}
EmitReceipt captures synchronous dispatch results
type Envelope ¶
type Envelope struct {
// ID is the unique event identifier
ID EventID `json:"id"`
// Topic is the destination topic
Topic TopicName `json:"topic"`
// OccurredAt is the emit timestamp in UTC
OccurredAt time.Time `json:"occurred_at"`
// Headers holds operational metadata
Headers Headers `json:"headers"`
// Payload is encoded topic payload data
Payload json.RawMessage `json:"payload"`
// ContextSnapshot holds restorable context metadata
ContextSnapshot ContextSnapshot `json:"context_snapshot"`
}
Envelope is the durable event envelope
type EventID ¶
type EventID string
EventID is a stable identifier used for idempotency and traceability
type Gala ¶
type Gala struct {
// contains filtered or unexported fields
}
Gala provides cohesive event dispatch + worker lifecycle management no black tie required, but a riverboat and some confetti wouldn't hurt
func (*Gala) ContextManager ¶
func (g *Gala) ContextManager() *ContextManager
ContextManager returns the Gala context manager
func (*Gala) DispatchEnvelope ¶
DispatchEnvelope dispatches one envelope to all listeners on the topic
func (*Gala) EmitEnvelope ¶
EmitEnvelope dispatches a pre-built envelope using its topic registration
func (*Gala) EmitWithHeaders ¶
func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any, headers Headers) EmitReceipt
EmitWithHeaders emits a payload with explicit headers
func (*Gala) StartWorkers ¶
StartWorkers starts Gala workers
type Handler ¶
type Handler[T any] func(HandlerContext, T) error
Handler processes a typed event payload
type HandlerContext ¶
type HandlerContext struct {
// Context is the restored event context used for listener execution
Context context.Context
// Envelope is the envelope being processed
Envelope Envelope
// Injector provides typed dependency lookup via samber/do
Injector do.Injector
}
HandlerContext provides event context and dependency resolution scope for listeners
type Headers ¶
type Headers struct {
// IdempotencyKey identifies duplicate-safe processing scope
IdempotencyKey string `json:"idempotency_key,omitempty"`
// Properties stores additional typed metadata projected to string values
Properties map[string]string `json:"properties,omitempty"`
}
Headers defines operational metadata for an envelope
type JSONCodec ¶
type JSONCodec[T any] struct{}
JSONCodec is the default JSON implementation of Codec
type ListenerError ¶
type ListenerError struct {
// ListenerName is the name of the listener that failed
ListenerName string
// Cause is the underlying error from the listener
Cause error
// Panicked indicates whether the listener panicked
Panicked bool
}
ListenerError captures a listener execution failure with context
func (ListenerError) Error ¶
func (e ListenerError) Error() string
Error returns a static error message for listener execution failures
func (ListenerError) Unwrap ¶
func (e ListenerError) Unwrap() error
Unwrap returns the underlying cause for use with errors.Is and errors.As
type ListenerID ¶
type ListenerID string
ListenerID identifies a registered listener
func AttachListener ¶
func AttachListener[T any](registry *Registry, definition Definition[T]) (ListenerID, error)
AttachListener registers one typed listener in the registry
func RegisterListeners ¶
func RegisterListeners[T any](registry *Registry, definitions ...Definition[T]) ([]ListenerID, error)
RegisterListeners registers listeners and ensures their topic contracts are configured
type Provider ¶
type Provider func() *Gala
Provider resolves the gala instance used by River workers
type Registration ¶
type Registration[T any] struct { // Topic defines the typed topic contract Topic Topic[T] // Codec serializes and deserializes payloads for the topic Codec Codec[T] }
Registration ties a typed topic to its codec
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry stores topic codecs, policies, and listeners
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates an empty topic/listener registry
func (*Registry) DecodePayload ¶
DecodePayload decodes payload bytes for a registered topic
func (*Registry) EncodePayload ¶
EncodePayload encodes a payload for a registered topic
type RiverDispatchArgs ¶
type RiverDispatchArgs struct {
// Envelope is the encoded gala envelope payload
Envelope []byte `json:"envelope"`
}
RiverDispatchArgs stores a JSON-encoded gala envelope for durable dispatch
func NewRiverDispatchArgs ¶
func NewRiverDispatchArgs(envelope Envelope) (RiverDispatchArgs, error)
NewRiverDispatchArgs builds River dispatch args from an envelope
func (RiverDispatchArgs) DecodeEnvelope ¶
func (a RiverDispatchArgs) DecodeEnvelope() (Envelope, error)
DecodeEnvelope decodes the gala envelope from dispatch args
type RiverDispatchWorker ¶
type RiverDispatchWorker struct {
river.WorkerDefaults[RiverDispatchArgs]
// contains filtered or unexported fields
}
RiverDispatchWorker processes durable gala dispatch jobs from River
func NewRiverDispatchWorker ¶
func NewRiverDispatchWorker(galaProvider Provider) *RiverDispatchWorker
NewRiverDispatchWorker creates a RiverDispatchWorker
func (*RiverDispatchWorker) Work ¶
func (w *RiverDispatchWorker) Work(ctx context.Context, job *river.Job[RiverDispatchArgs]) error
Work processes one River dispatch job and invokes Gala dispatch
type RiverDispatcher ¶
type RiverDispatcher struct {
// contains filtered or unexported fields
}
RiverDispatcher dispatches envelopes to River
func NewRiverDispatcher ¶
func NewRiverDispatcher(jobClient RiverInsertClient, defaultQueue string) (*RiverDispatcher, error)
NewRiverDispatcher creates a River-backed durable dispatcher
type RiverInsertClient ¶
type RiverInsertClient interface {
// Insert inserts a River job with optional insert options
Insert(context.Context, river.JobArgs, *river.InsertOpts) (*rivertype.JobInsertResult, error)
}
RiverInsertClient represents the minimal insert capability required for durable dispatch
type TypedContextCodec ¶
type TypedContextCodec[T any] struct { // contains filtered or unexported fields }
TypedContextCodec captures/restores context values stored via contextx.With
func NewTypedContextCodec ¶
func NewTypedContextCodec[T any](key ContextKey) TypedContextCodec[T]
NewTypedContextCodec creates a typed context codec for a specific snapshot key
func (TypedContextCodec[T]) Capture ¶
func (c TypedContextCodec[T]) Capture(ctx context.Context) (json.RawMessage, bool, error)
Capture extracts a typed context value and JSON encodes it
func (TypedContextCodec[T]) Key ¶
func (c TypedContextCodec[T]) Key() ContextKey
Key returns the codec snapshot key
func (TypedContextCodec[T]) Restore ¶
func (c TypedContextCodec[T]) Restore(ctx context.Context, raw json.RawMessage) (context.Context, error)
Restore JSON decodes a typed context value and re-attaches it