gala

package
v1.9.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

Gala

Gala is a durable event dispatch system built on River. It replaces the in-memory pkg/soiree with PostgreSQL-backed job persistence, automatic retries, and multi-instance scaling.

Events emitted through Gala survive process restarts, pod evictions, and deployment rollouts. When a listener fails, River retries the job with backoff until it succeeds or exhausts configured attempts.

How It Works

sequenceDiagram
    participant Hook as Ent Mutation Hook
    participant Gala as Gala Runtime
    participant PG as PostgreSQL (river_job)
    participant Worker as River Worker
    participant Listener as Listener Handler

    Hook->>Gala: enqueueGalaMutation()
    Gala->>Gala: Encode payload + capture context
    Gala->>PG: INSERT river_job (gala_dispatch_v1)
    Note over PG: Job persisted durably

    Worker->>PG: Poll for available jobs
    PG-->>Worker: Dequeue job
    Worker->>Gala: DispatchEnvelope()
    Gala->>Gala: Decode payload + restore context
    Gala->>Listener: handler(ctx, payload)

    alt Listener succeeds
        Worker->>PG: Mark job complete
    else Listener fails
        Worker->>PG: Schedule retry with backoff
    end

The entire envelope (payload, headers, context snapshot) is JSON-serialized into a single river_job row. This means your event data lives in the same PostgreSQL database as your application data, simplifying operational concerns.

Defining Topics and Listeners

A topic is a named channel for events of a specific payload type. The generic type parameter enforces compile-time type safety between emitters and listeners.

// Define your payload type
type InvoiceCreated struct {
    InvoiceID  string `json:"invoice_id"`
    CustomerID string `json:"customer_id"`
    Amount     int64  `json:"amount_cents"`
}

// Define the topic with its payload type
var invoiceCreatedTopic = gala.Topic[InvoiceCreated]{
    Name: "billing.invoice.created",
}

Register listeners using RegisterListeners, which handles topic registration automatically:

func RegisterBillingListeners(registry *gala.Registry) ([]gala.ListenerID, error) {
    return gala.RegisterListeners(registry,
        gala.Definition[InvoiceCreated]{
            Topic: invoiceCreatedTopic,
            Name:  "billing.invoice.send-receipt",
            Handle: func(ctx gala.HandlerContext, payload InvoiceCreated) error {
                // Access dependencies via the injector
                mailer, err := do.Invoke[*email.Client](ctx.Injector)
                if err != nil {
                    return err
                }
                return mailer.SendReceipt(ctx.Context, payload.CustomerID, payload.InvoiceID)
            },
        },
        gala.Definition[InvoiceCreated]{
            Topic: invoiceCreatedTopic,
            Name:  "billing.invoice.update-metrics",
            Handle: func(ctx gala.HandlerContext, payload InvoiceCreated) error {
                metrics.InvoicesCreated.Inc()
                return nil
            },
        },
    )
}
Operation Filtering

For mutation-style payloads with an Operation field, listeners can filter by operation type:

gala.Definition[eventqueue.MutationGalaPayload]{
    Topic: gala.Topic[eventqueue.MutationGalaPayload]{
        Name: "Organization",
    },
    Name:       "entitlements.organization.create",
    Operations: []string{ent.OpCreate.String()},  // Only handle creates
    Handle:     handleOrganizationCreated,
}

Emitting Events

For direct emission (outside the mutation hook flow):

receipt := galaApp.EmitWithHeaders(ctx, invoiceCreatedTopic.Name, InvoiceCreated{
    InvoiceID:  "inv_123",
    CustomerID: "cus_456",
    Amount:     9900,
}, gala.Headers{
    IdempotencyKey: "inv_123", // Enables replay-safe consumption
})

if receipt.Err != nil {
    return receipt.Err
}

Most events in the codebase flow through the ent mutation hook (EmitGalaEventHook), which automatically builds and emits MutationGalaPayload envelopes after successful commits.

Codecs: Type-Safe Serialization

Codecs handle the serialization boundary between your Go types and the JSON stored in River jobs. Every topic registration requires a codec:

type Codec[T any] interface {
    Encode(T) ([]byte, error)
    Decode([]byte) (T, error)
}

The built-in JSONCodec[T] handles standard JSON marshaling and is the default for most use cases. Custom codecs are useful when you need:

  • Non-JSON formats (protobuf, msgpack)
  • Encryption at rest for sensitive payloads
  • Schema migration or backwards compatibility handling
gala.RegisterTopic(registry, gala.Registration[InvoiceCreated]{
    Topic: invoiceCreatedTopic,
    Codec: gala.JSONCodec[InvoiceCreated]{}, // Default JSON codec
})

When using RegisterListeners, topics are auto-registered with JSONCodec.

Context Propagation

Gala snapshots context values at emit time and restores them when the listener executes. This happens transparently for authenticated user context (auth.AuthenticatedUser), so listeners can access the original caller's identity even when processing asynchronously.

flowchart LR
    subgraph "Emit Time"
        A[HTTP Request Context] --> B[ContextManager.Capture]
        B --> C[ContextSnapshot JSON]
    end

    subgraph "Stored in River Job"
        C --> D[(Envelope.ContextSnapshot)]
    end

    subgraph "Listener Execution"
        D --> E[ContextManager.Restore]
        E --> F[Listener receives original auth context]
    end
Context Flags

For boolean signals that need to propagate (like workflow bypass flags):

// At emit time
ctx = gala.WithFlag(ctx, gala.ContextFlagWorkflowBypass)

// In listener
if gala.HasFlag(ctx.Context, gala.ContextFlagWorkflowBypass) {
    // Skip workflow processing
}

Dependency Injection

Listeners receive a HandlerContext with a do.Injector for resolving dependencies. Dependencies are registered at startup:

// During server initialization
do.ProvideValue(runtime.Injector(), dbClient)                    // *generated.Client
do.ProvideValue(runtime.Injector(), dbClient.EntitlementManager) // you could access via the dbclient -> *entitlements.StripeClient
do.ProvideNamedValue(runtime.Injector(), "stripe_client", stripeClient) // you can also access the client directly
do.ProvideValue(runtime.Injector(), dbClient.TokenManager)       // *tokens.TokenManager
do.ProvideValue(runtime.Injector(), wfEngine)                    // *engine.WorkflowEngine

// In listener
func handle(ctx gala.HandlerContext, payload MyPayload) error {
	// now you can profit from the injected type
	stripeClient, err := do.InvokeNamed[*stripe.Client](ctx.Injector, "stripe_client")
	if err != nil {
		return err
	}

	client, err := do.Invoke[*generated.Client](ctx.Injector)
	if err != nil {
		return err
	}

	// use the clients

	return nil
}

This avoids global state and makes listeners testable with mock dependencies.

Durability and Retries

Gala's durability comes from River, which stores jobs in PostgreSQL. Key characteristics:

Aspect Behavior
Storage Jobs stored in river_job table alongside application data
Delivery At-least-once (use IdempotencyKey for exactly-once semantics)
Retries Configurable via Config.MaxRetries, exponential backoff
Scaling Multiple workers poll the same queue; work distributed automatically
Ordering No ordering guarantees across events; order preserved within single event's listeners

When a listener returns an error, the job is rescheduled for retry. Panics are recovered and converted to errors, triggering the same retry behavior.

Server Integration

The standard setup in serveropts:

galaApp, err := gala.NewGala(ctx, gala.Config{
    ConnectionURI: jobQueueConnectionURI,
    QueueName:     "events",     // River queue name
    WorkerCount:   10,           // Concurrent workers polling this queue
    MaxRetries:    5,            // Retry attempts before marking failed
})
if err != nil {
    return err
}

// Register dependencies for listeners
do.ProvideValue(galaApp.Injector(), dbClient)

// Register listeners
hooks.RegisterGalaSlackListeners(galaApp.Registry())

// Start workers
if err := galaApp.StartWorkers(ctx); err != nil {
    return err
}

// On shutdown
defer galaApp.StopWorkers(ctx)
defer galaApp.Close()

Concurrency Model

Event A ──► Worker 1 ──► Listener executes
Event B ──► Worker 2 ──► Listener executes
Event C ──► Worker 3 ──► Listener executes
   ...         ...

With WorkerCount: 100, up to 100 events process concurrently. Each event's listener(s) execute sequentially within their worker, providing:

  • Predictable execution order within a single event
  • Simple error/retry semantics per event
  • No intra-event race conditions

For higher throughput, increase WorkerCount or scale horizontally (multiple server instances share the same queue).

The Envelope

Every emitted event becomes an Envelope:

type Envelope struct {
    ID              EventID         // ULID for tracing and idempotency
    Topic           TopicName       // Routing key for listener dispatch
    OccurredAt      time.Time       // Emit timestamp (UTC)
    Headers         Headers         // IdempotencyKey + arbitrary properties
    Payload         json.RawMessage // Codec-encoded payload bytes
    ContextSnapshot ContextSnapshot // Captured auth context + flags
}

The entire envelope is JSON-serialized into RiverDispatchArgs.Envelope and stored as the job's arguments. On worker pickup, it's deserialized and dispatched to matching listeners.

Pre-built Envelopes

For migration adapters or replay scenarios where you already have a fully constructed envelope:

err := galaApp.EmitEnvelope(ctx, gala.Envelope{
    ID:              gala.EventID("evt_replay_123"),
    Topic:           invoiceCreatedTopic.Name,
    Payload:         preEncodedPayload,
    Headers:         gala.Headers{IdempotencyKey: "evt_replay_123"},
    ContextSnapshot: capturedSnapshot,
})

Error Handling

Scenario Behavior
Unregistered topic EmitWithHeaders returns error immediately
Codec encode failure EmitWithHeaders returns error immediately
River insert failure EmitWithHeaders returns error (event not queued)
Listener returns error Job scheduled for retry
Listener panics Recovered, wrapped as error, job scheduled for retry
Max retries exhausted Job marked as discarded in River

Listener errors are wrapped in ListenerError which includes the listener name and whether a panic occurred, useful for debugging and metrics.

Full Representative Example

package main

import (
	"context"
	"log"

	"github.com/theopenlane/core/pkg/gala"
)

type UserCreated struct {
	UserID string `json:"user_id"`
	Email  string `json:"email"`
}

// define a stable topic name and your "envelope" which is your JSON data payload
var userCreatedTopic = gala.Topic[UserCreated]{
	Name: gala.TopicName("user.created"),
}

func main() {
	// Initialize Gala application and configure worker pool
	app, err := gala.NewGala(context.Background(), gala.Config{
		ConnectionURI: jobQueueURI,
		QueueName:     gala.DefaultQueueName,
		WorkerCount:   10,
		MaxRetries:    5,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer app.Close()

	// Start Gala workers
	if err := app.StartWorkers(context.Background()); err != nil {
		log.Fatal(err)
	}

	// Register event topic and payload codec
	runtime := app.Runtime()
	err = gala.RegisterTopic(runtime.Registry(), gala.Registration[UserCreated]{
		Topic: userCreatedTopic,
		Codec: gala.JSONCodec[UserCreated]{},
	})
	if err != nil {
		log.Fatal(err)
	}

	// Attach event listener (handler) for the topic
	_, err = gala.AttachListener(runtime.Registry(), gala.Definition[UserCreated]{
		Topic: userCreatedTopic,
		Name:  "welcome-email",
		Handle: func(ctx gala.HandlerContext, payload UserCreated) error {
			log.Printf("send welcome email to %s (%s)", payload.UserID, payload.Email)
			return nil
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// Emit event to the topic (triggers durable dispatch)
	receipt := runtime.EmitWithHeaders(context.Background(), userCreatedTopic.Name, UserCreated{
		UserID: "usr_123",
		Email:  "user@example.com",
	}, gala.Headers{})
	if receipt.Err != nil {
		log.Fatal(receipt.Err)
	}
	log.Printf("event accepted: id=%s", receipt.EventID)
}

Performance Comparisons

┌──────────────────────────┬────────┬─────────┬────────┬──────────┬───────────────────┬─────────────────────┬───────┐ │ Scenario │ Events │ Workers │ Topics │ Emitters │ Gala (events/sec) │ Soiree (events/sec) │ Ratio │ ├──────────────────────────┼────────┼─────────┼────────┼──────────┼───────────────────┼─────────────────────┼───────┤ │ Sequential, Multi-topic │ 500 │ 20 │ 5 │ 1 │ 966 │ 414,350 │ 1:429 │ ├──────────────────────────┼────────┼─────────┼────────┼──────────┼───────────────────┼─────────────────────┼───────┤ │ Sequential, Single-topic │ 500 │ 20 │ 1 │ 1 │ 915 │ 268,372 │ 1:293 │ ├──────────────────────────┼────────┼─────────┼────────┼──────────┼───────────────────┼─────────────────────┼───────┤ │ Concurrent, Multi-topic │ 1000 │ 50 │ 5 │ 50 │ 2,100 │ 295,610 │ 1:141 │ ├──────────────────────────┼────────┼─────────┼────────┼──────────┼───────────────────┼─────────────────────┼───────┤ │ Concurrent, Single-topic │ 1000 │ 50 │ 1 │ 50 │ 3,351 │ 430,339 │ 1:128 │ └──────────────────────────┴────────┴─────────┴────────┴──────────┴───────────────────┴─────────────────────┴───────┘ Observations:

  1. Emission pattern matters for Gala: Concurrent emitters (50 goroutines) more than double throughput (966 → 2,100 for multi-topic, 915 → 3,351 for single-topic) because PostgreSQL INSERTTs parallelize.
  2. Topic count has minimal impact on Gala: Single vs multi-topic shows ~5% variance (966 vs 915 sequential). The bottleneck is River's job fetch cycle, not topic dispatch.
  3. Soiree's single-topic variance: Soiree actually performs better with single topic in concurrent scenarios (430k vs 296k). Less topic lookup overhead, and the pond pool distributes listener work efficiently.
  4. Gap narrows with concurrency: The Gala:Soiree ratio improves from 1:429 (sequential) to 1:128 (concurrent) because Gala's parallel INSERT/fetch paths scale with load while soiree's in-memory dispatch has near-zero overhead regardless.

Misc Background

gala is a "v2" of the original pkg/soiree that was an in-memory event emitter + listener definition library that wrapped the alitto/pond library.

Known deficiencies / rationale:

  • Current Soiree path is in-memory by default and not sufficient for durable multi-pod processing.
  • Redis-backed Soiree durability is not viable for our current contracts.
  • Workflow engine depends on event semantics but is not yet broadly active; this is the best point to establish final-state architecture.
  • Existing listeners remain valuable and should be migrated topic-by-topic with controlled risk.
  • We can dual-emit during migration (soiree + gala) while keeping ownership clear per topic.

Core Goals & Directives

Applying lessons learned with Soiree:

  • Listener registration....defining a listener should be enough to make registration straightforward without repeated wrappers
  • The new package should not require call sites to manually drain channels for enqueue outcomes
  • Handlers / Listeners should receive typed dependency accessors to avoid repetitive pointer extraction and type assertions
  • We need consistent durability and patterns...use River per-queue concurrency and retries rather than bespoke goroutine orchestration
  • Data access...prefer typed enum parsing helpers over local switch/case parsers
  • Keep context restore centralized; avoid per-listener ad-hoc context mutation
  • First-class context reconstruction (auth + context flags + operational metadata) is priority

This information is provided purely for context on the "why the code exists" and "why it works this way".

Documentation

Overview

Package gala provides durable, typed eventing primitives intended to replace ad-hoc in-memory dispatch patterns with a River-native foundation it's a black tie affair for your events, ensuring they arrive in style and on time

Index

Constants

View Source
const DefaultQueueName = "events"

DefaultQueueName is the default queue used for gala durable dispatch jobs

View Source
const RiverDispatchJobKind = "gala_dispatch_v1"

RiverDispatchJobKind is the River job kind used for durable gala dispatch

Variables

View Source
var (
	// ErrGalaRequired is returned when a nil gala runtime is used
	ErrGalaRequired = errors.New("gala: gala is required")
	// ErrRegistryRequired is returned when a nil topic registry is used
	ErrRegistryRequired = errors.New("gala: registry is required")
	// ErrTopicNameRequired is returned when a topic name is empty
	ErrTopicNameRequired = errors.New("gala: topic name is required")
	// ErrTopicAlreadyRegistered is returned when a topic is registered more than once
	ErrTopicAlreadyRegistered = errors.New("gala: topic already registered")
	// ErrTopicNotRegistered is returned when topic metadata cannot be found
	ErrTopicNotRegistered = errors.New("gala: topic not registered")
	// ErrCodecRequired is returned when a registration is missing a codec
	ErrCodecRequired = errors.New("gala: codec is required")
	// ErrListenerNameRequired is returned when a listener name is empty
	ErrListenerNameRequired = errors.New("gala: listener name is required")
	// ErrListenerHandlerRequired is returned when a listener callback is missing
	ErrListenerHandlerRequired = errors.New("gala: listener handler is required")
	// ErrListenerTopicNotRegistered is returned when a listener is attached before topic registration
	ErrListenerTopicNotRegistered = errors.New("gala: listener topic not registered")
	// ErrPayloadTypeMismatch is returned when payload casting fails for a topic or listener
	ErrPayloadTypeMismatch = errors.New("gala: payload type mismatch")
	// ErrPayloadEncodeFailed is returned when payload serialization fails
	ErrPayloadEncodeFailed = errors.New("gala: payload encode failed")
	// ErrPayloadDecodeFailed is returned when payload deserialization fails
	ErrPayloadDecodeFailed = errors.New("gala: payload decode failed")
	// ErrEnvelopePayloadRequired is returned when an envelope has an empty payload
	ErrEnvelopePayloadRequired = errors.New("gala: envelope payload is required")
	// ErrDispatcherRequired is returned when emit is attempted without a dispatcher
	ErrDispatcherRequired = errors.New("gala: dispatcher is required")
	// ErrDispatchFailed is returned when dispatch fails
	ErrDispatchFailed = errors.New("gala: dispatch failed")
	// ErrContextCodecRequired is returned when context codec registration receives nil
	ErrContextCodecRequired = errors.New("gala: context codec is required")
	// ErrContextCodecKeyRequired is returned when a context codec key is empty
	ErrContextCodecKeyRequired = errors.New("gala: context codec key is required")
	// ErrContextCodecAlreadyRegistered is returned when a context codec key is duplicated
	ErrContextCodecAlreadyRegistered = errors.New("gala: context codec already registered")
	// ErrContextSnapshotCaptureFailed is returned when snapshot capture fails
	ErrContextSnapshotCaptureFailed = errors.New("gala: context snapshot capture failed")
	// ErrContextSnapshotRestoreFailed is returned when snapshot restore fails
	ErrContextSnapshotRestoreFailed = errors.New("gala: context snapshot restore failed")
	// ErrRiverJobClientRequired is returned when a river dispatcher is built without a job client
	ErrRiverJobClientRequired = errors.New("gala: river job client is required")
	// ErrRiverGalaProviderRequired is returned when a river worker is built without a gala provider
	ErrRiverGalaProviderRequired = errors.New("gala: river gala provider is required")
	// ErrRiverDispatchJobEnvelopeRequired is returned when a river dispatch job has no envelope payload
	ErrRiverDispatchJobEnvelopeRequired = errors.New("gala: river dispatch job envelope is required")
	// ErrRiverEnvelopeEncodeFailed is returned when encoding a river envelope payload fails
	ErrRiverEnvelopeEncodeFailed = errors.New("gala: river envelope encode failed")
	// ErrRiverEnvelopeDecodeFailed is returned when decoding a river envelope payload fails
	ErrRiverEnvelopeDecodeFailed = errors.New("gala: river envelope decode failed")
	// ErrRiverDispatchInsertFailed is returned when inserting a durable river dispatch job fails
	ErrRiverDispatchInsertFailed = errors.New("gala: river dispatch insert failed")
	// ErrRiverConnectionURIRequired is returned when river runtime setup is missing a connection URI
	ErrRiverConnectionURIRequired = errors.New("gala: river connection URI is required")
	// ErrRiverClientInitializationFailed is returned when building the river queue client fails
	ErrRiverClientInitializationFailed = errors.New("gala: river client initialization failed")
	// ErrRiverWorkerStartFailed is returned when starting gala river workers fails
	ErrRiverWorkerStartFailed = errors.New("gala: river worker start failed")
	// ErrRiverWorkerStopFailed is returned when stopping gala river workers fails
	ErrRiverWorkerStopFailed = errors.New("gala: river worker stop failed")
	// ErrRiverClientCloseFailed is returned when closing the gala river queue client fails
	ErrRiverClientCloseFailed = errors.New("gala: river client close failed")
	// ErrAuthContextEncodeFailed is returned when auth context snapshot encoding fails
	ErrAuthContextEncodeFailed = errors.New("gala: auth context encode failed")
	// ErrAuthContextDecodeFailed is returned when auth context snapshot decoding fails
	ErrAuthContextDecodeFailed = errors.New("gala: auth context decode failed")
	// ErrListenerPanicked is returned when a listener panics during execution
	ErrListenerPanicked = errors.New("gala: listener panicked")
)

Functions

func HasFlag

func HasFlag(ctx context.Context, flag ContextFlag) bool

HasFlag reports whether a typed context flag is set

func RegisterTopic

func RegisterTopic[T any](registry *Registry, registration Registration[T]) error

RegisterTopic registers one typed topic in the registry

func WithFlag

func WithFlag(ctx context.Context, flag ContextFlag) context.Context

WithFlag sets a typed context flag

Types

type AuthSnapshot

type AuthSnapshot struct {
	// SubjectID is the authenticated principal identifier
	SubjectID string `json:"subject_id,omitempty"`
	// SubjectName is the authenticated principal display name
	SubjectName string `json:"subject_name,omitempty"`
	// SubjectEmail is the authenticated principal email
	SubjectEmail string `json:"subject_email,omitempty"`
	// OrganizationID is the active organization scope
	OrganizationID string `json:"organization_id,omitempty"`
	// OrganizationName is the active organization display name
	OrganizationName string `json:"organization_name,omitempty"`
	// OrganizationIDs contains organizations available in caller scope
	OrganizationIDs []string `json:"organization_ids,omitempty"`
	// AuthenticationType identifies the authentication method used by the caller
	AuthenticationType string `json:"authentication_type,omitempty"`
	// OrganizationRole captures the caller role within the active organization
	OrganizationRole string `json:"organization_role,omitempty"`
	// IsSystemAdmin reports whether the caller has system-admin privileges
	IsSystemAdmin bool `json:"is_system_admin,omitempty"`
}

AuthSnapshot is a JSON-safe snapshot of authenticated user context values

func (AuthSnapshot) ToAuthenticatedUser

func (s AuthSnapshot) ToAuthenticatedUser() *auth.AuthenticatedUser

ToAuthenticatedUser converts a snapshot into an auth.AuthenticatedUser payload

type Codec

type Codec[T any] interface {
	// Encode serializes the typed payload
	Encode(T) ([]byte, error)
	// Decode deserializes payload bytes into the typed payload
	Decode([]byte) (T, error)
}

Codec encodes and decodes a topic payload type

type Config

type Config struct {
	// Enabled toggles Gala worker startup and dispatch support when true
	Enabled bool
	// ConnectionURI is the database connection URI used for the dedicated gala river client
	ConnectionURI string
	// QueueName is the gala queue used for durable dispatch jobs
	QueueName string
	// WorkerCount is the max worker concurrency for the gala queue
	WorkerCount int
	// MaxRetries sets max attempts for gala dispatch jobs when greater than zero
	MaxRetries int
	// RunMigrations enables River schema migrations on startup (use for tests only)
	RunMigrations bool
	// FetchCooldown is the minimum time between job fetches per worker (default 100ms, min 1ms)
	// Lower values increase throughput but also database load. River enforces 1ms minimum.
	FetchCooldown time.Duration
	// FetchPollInterval is the fallback polling interval when LISTEN/NOTIFY misses events (default 1s)
	// This is only used when LISTEN/NOTIFY fails to deliver notifications.
	FetchPollInterval time.Duration
}

Config configures cohesive Gala startup

type ContextCodec

type ContextCodec interface {
	// Key returns the stable snapshot key
	Key() ContextKey
	// Capture extracts and encodes context data
	Capture(context.Context) (json.RawMessage, bool, error)
	// Restore decodes and re-attaches context data
	Restore(context.Context, json.RawMessage) (context.Context, error)
}

ContextCodec captures and restores one typed context value

type ContextFlag

type ContextFlag string

ContextFlag identifies a boolean context flag

const (
	// ContextFlagWorkflowBypass marks workflow bypass behavior
	ContextFlagWorkflowBypass ContextFlag = "workflow_bypass"
	// ContextFlagWorkflowAllowEventEmission allows workflow listener execution while bypass is set
	ContextFlagWorkflowAllowEventEmission ContextFlag = "workflow_allow_event_emission"
)

type ContextKey

type ContextKey = string

ContextKey identifies a restorable context value key using string alias for better readability and to avoid collisions with other context keys this has to be a string to be used as a JSON key for durability rather than a strict type + contextx

type ContextManager

type ContextManager struct {
	// contains filtered or unexported fields
}

ContextManager manages context codecs and snapshot round-trips

func NewContextManager

func NewContextManager(codecs ...ContextCodec) (*ContextManager, error)

NewContextManager creates a context manager and registers any initial codecs

func (*ContextManager) Capture

func (m *ContextManager) Capture(ctx context.Context) (ContextSnapshot, error)

Capture captures all registered context codec values and current context flags

func (*ContextManager) Register

func (m *ContextManager) Register(codec ContextCodec) error

Register registers a context codec by key

func (*ContextManager) Restore

func (m *ContextManager) Restore(ctx context.Context, snapshot ContextSnapshot) (context.Context, error)

Restore restores snapshot values into a new context

type ContextSnapshot

type ContextSnapshot struct {
	// Values contains codec-managed context values
	Values map[ContextKey]json.RawMessage `json:"values,omitempty"`
	// Flags contains boolean context flags
	Flags map[ContextFlag]bool `json:"flags,omitempty"`
}

ContextSnapshot captures context data that can be restored after durable hops

type Definition

type Definition[T any] struct {
	// Topic is the topic handled by this listener
	Topic Topic[T]
	// Name is the stable listener name
	Name string
	// Operations optionally scopes listener interest to specific mutation operations
	// Empty means the listener accepts all operations for the topic
	Operations []string
	// Handle is the callback invoked for this listener
	Handle Handler[T]
}

Definition defines one listener binding

type Dispatcher

type Dispatcher interface {
	// Dispatch dispatches an envelope to the configured transport
	Dispatch(context.Context, Envelope) error
}

Dispatcher dispatches envelopes to the configured transport

type DurableContextCodec

type DurableContextCodec struct{}

DurableContextCodec captures and restores durable context values including auth and logger fields

func NewContextCodec

func NewContextCodec() DurableContextCodec

NewContextCodec creates a context codec for durable context capture

func (DurableContextCodec) Capture

Capture extracts durable context values and encodes them as JSON

func (DurableContextCodec) Key

Key returns the stable snapshot key used by the context codec

func (DurableContextCodec) Restore

Restore decodes durable context values and restores them on the supplied context

type DurableContextSnapshot

type DurableContextSnapshot struct {
	// Auth contains authenticated user context when present
	Auth *AuthSnapshot `json:"auth,omitempty"`
	// LogFields contains logger context fields for correlation and tracing
	LogFields map[string]any `json:"log_fields,omitempty"`
}

DurableContextSnapshot captures context values that should persist across durable event processing

type EmitReceipt

type EmitReceipt struct {
	// EventID is the emitted event identifier
	EventID EventID
	// Accepted reports whether the event was accepted for processing
	Accepted bool
	// Err contains any terminal emit error
	Err error
}

EmitReceipt captures synchronous dispatch results

type Envelope

type Envelope struct {
	// ID is the unique event identifier
	ID EventID `json:"id"`
	// Topic is the destination topic
	Topic TopicName `json:"topic"`
	// OccurredAt is the emit timestamp in UTC
	OccurredAt time.Time `json:"occurred_at"`
	// Headers holds operational metadata
	Headers Headers `json:"headers"`
	// Payload is encoded topic payload data
	Payload json.RawMessage `json:"payload"`
	// ContextSnapshot holds restorable context metadata
	ContextSnapshot ContextSnapshot `json:"context_snapshot"`
}

Envelope is the durable event envelope

type EventID

type EventID string

EventID is a stable identifier used for idempotency and traceability

func NewEventID

func NewEventID() EventID

NewEventID creates a new event identifier

type Gala

type Gala struct {
	// contains filtered or unexported fields
}

Gala provides cohesive event dispatch + worker lifecycle management no black tie required, but a riverboat and some confetti wouldn't hurt

func NewGala

func NewGala(ctx context.Context, config Config) (app *Gala, err error)

NewGala initializes your gala, initializes dependencies, and starts workers

func (*Gala) Close

func (g *Gala) Close() error

Close closes the dedicated Gala queue client

func (*Gala) ContextManager

func (g *Gala) ContextManager() *ContextManager

ContextManager returns the Gala context manager

func (*Gala) DispatchEnvelope

func (g *Gala) DispatchEnvelope(ctx context.Context, envelope Envelope) error

DispatchEnvelope dispatches one envelope to all listeners on the topic

func (*Gala) EmitEnvelope

func (g *Gala) EmitEnvelope(ctx context.Context, envelope Envelope) error

EmitEnvelope dispatches a pre-built envelope using its topic registration

func (*Gala) EmitWithHeaders

func (g *Gala) EmitWithHeaders(ctx context.Context, topic TopicName, payload any, headers Headers) EmitReceipt

EmitWithHeaders emits a payload with explicit headers

func (*Gala) Injector

func (g *Gala) Injector() do.Injector

Injector returns the Gala dependency injector

func (*Gala) Registry

func (g *Gala) Registry() *Registry

Registry returns the Gala topic/listener registry

func (*Gala) StartWorkers

func (g *Gala) StartWorkers(ctx context.Context) error

StartWorkers starts Gala workers

func (*Gala) StopWorkers

func (g *Gala) StopWorkers(ctx context.Context) error

StopWorkers stops Gala workers

type Handler

type Handler[T any] func(HandlerContext, T) error

Handler processes a typed event payload

type HandlerContext

type HandlerContext struct {
	// Context is the restored event context used for listener execution
	Context context.Context
	// Envelope is the envelope being processed
	Envelope Envelope
	// Injector provides typed dependency lookup via samber/do
	Injector do.Injector
}

HandlerContext provides event context and dependency resolution scope for listeners

type Headers

type Headers struct {
	// IdempotencyKey identifies duplicate-safe processing scope
	IdempotencyKey string `json:"idempotency_key,omitempty"`
	// Properties stores additional typed metadata projected to string values
	Properties map[string]string `json:"properties,omitempty"`
}

Headers defines operational metadata for an envelope

type JSONCodec

type JSONCodec[T any] struct{}

JSONCodec is the default JSON implementation of Codec

func (JSONCodec[T]) Decode

func (JSONCodec[T]) Decode(payload []byte) (T, error)

Decode deserializes payload data using JSON

func (JSONCodec[T]) Encode

func (JSONCodec[T]) Encode(payload T) ([]byte, error)

Encode serializes payload data using JSON

type ListenerError

type ListenerError struct {
	// ListenerName is the name of the listener that failed
	ListenerName string
	// Cause is the underlying error from the listener
	Cause error
	// Panicked indicates whether the listener panicked
	Panicked bool
}

ListenerError captures a listener execution failure with context

func (ListenerError) Error

func (e ListenerError) Error() string

Error returns a static error message for listener execution failures

func (ListenerError) Unwrap

func (e ListenerError) Unwrap() error

Unwrap returns the underlying cause for use with errors.Is and errors.As

type ListenerID

type ListenerID string

ListenerID identifies a registered listener

func AttachListener

func AttachListener[T any](registry *Registry, definition Definition[T]) (ListenerID, error)

AttachListener registers one typed listener in the registry

func RegisterListeners

func RegisterListeners[T any](registry *Registry, definitions ...Definition[T]) ([]ListenerID, error)

RegisterListeners registers listeners and ensures their topic contracts are configured

type Provider

type Provider func() *Gala

Provider resolves the gala instance used by River workers

type Registration

type Registration[T any] struct {
	// Topic defines the typed topic contract
	Topic Topic[T]
	// Codec serializes and deserializes payloads for the topic
	Codec Codec[T]
}

Registration ties a typed topic to its codec

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry stores topic codecs, policies, and listeners

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates an empty topic/listener registry

func (*Registry) DecodePayload

func (r *Registry) DecodePayload(topic TopicName, payload []byte) (any, error)

DecodePayload decodes payload bytes for a registered topic

func (*Registry) EncodePayload

func (r *Registry) EncodePayload(topic TopicName, payload any) ([]byte, error)

EncodePayload encodes a payload for a registered topic

func (*Registry) InterestedIn

func (r *Registry) InterestedIn(topic TopicName, operation string) bool

InterestedIn reports whether any listener is registered for topic+operation Empty operation means topic-level interest only

type RiverDispatchArgs

type RiverDispatchArgs struct {
	// Envelope is the encoded gala envelope payload
	Envelope []byte `json:"envelope"`
}

RiverDispatchArgs stores a JSON-encoded gala envelope for durable dispatch

func NewRiverDispatchArgs

func NewRiverDispatchArgs(envelope Envelope) (RiverDispatchArgs, error)

NewRiverDispatchArgs builds River dispatch args from an envelope

func (RiverDispatchArgs) DecodeEnvelope

func (a RiverDispatchArgs) DecodeEnvelope() (Envelope, error)

DecodeEnvelope decodes the gala envelope from dispatch args

func (RiverDispatchArgs) Kind

func (RiverDispatchArgs) Kind() string

Kind satisfies river.JobArgs

type RiverDispatchWorker

type RiverDispatchWorker struct {
	river.WorkerDefaults[RiverDispatchArgs]
	// contains filtered or unexported fields
}

RiverDispatchWorker processes durable gala dispatch jobs from River

func NewRiverDispatchWorker

func NewRiverDispatchWorker(galaProvider Provider) *RiverDispatchWorker

NewRiverDispatchWorker creates a RiverDispatchWorker

func (*RiverDispatchWorker) Work

Work processes one River dispatch job and invokes Gala dispatch

type RiverDispatcher

type RiverDispatcher struct {
	// contains filtered or unexported fields
}

RiverDispatcher dispatches envelopes to River

func NewRiverDispatcher

func NewRiverDispatcher(jobClient RiverInsertClient, defaultQueue string) (*RiverDispatcher, error)

NewRiverDispatcher creates a River-backed durable dispatcher

func (*RiverDispatcher) Dispatch

func (d *RiverDispatcher) Dispatch(ctx context.Context, envelope Envelope) error

Dispatch dispatches an envelope to River for processing by a Worker

type RiverInsertClient

type RiverInsertClient interface {
	// Insert inserts a River job with optional insert options
	Insert(context.Context, river.JobArgs, *river.InsertOpts) (*rivertype.JobInsertResult, error)
}

RiverInsertClient represents the minimal insert capability required for durable dispatch

type Topic

type Topic[T any] struct {
	// Name is the stable topic identifier
	Name TopicName
}

Topic defines a strongly typed topic contract

type TopicName

type TopicName string

TopicName is the stable string identifier for a topic

type TypedContextCodec

type TypedContextCodec[T any] struct {
	// contains filtered or unexported fields
}

TypedContextCodec captures/restores context values stored via contextx.With

func NewTypedContextCodec

func NewTypedContextCodec[T any](key ContextKey) TypedContextCodec[T]

NewTypedContextCodec creates a typed context codec for a specific snapshot key

func (TypedContextCodec[T]) Capture

func (c TypedContextCodec[T]) Capture(ctx context.Context) (json.RawMessage, bool, error)

Capture extracts a typed context value and JSON encodes it

func (TypedContextCodec[T]) Key

func (c TypedContextCodec[T]) Key() ContextKey

Key returns the codec snapshot key

func (TypedContextCodec[T]) Restore

Restore JSON decodes a typed context value and re-attaches it

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL