pubsub

package
v0.26.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package pubsub is nexus's typed pub/sub primitive. Topics are declared as package-level variables — the same Encore-style shape as nexus.Cron and (proposed) nexus.Secret — so a publisher can call `topic.Publish(ctx, payload)` from anywhere without needing the transport injected.

The package is transport-agnostic. Topics register into a process- global registry at package-init time; an option chosen at app boot (pubsub.UseInMemory for tests, pubsub.UseRabbit for production) binds the live transport to every registered topic. Subscribers are declared via pubsub.Subscribe(...) inside a nexus.Module — the option wraps nexus.AsWorker so subscriptions inherit the same lifecycle, panic recovery, and dashboard/registry surfacing as any other long-running worker.

Default semantics:

  • At-least-once delivery. Handler returns nil → Ack; returns error → Retry with exponential backoff up to MaxRetries (default 3), then DLQ. JSON-decode failures go straight to DLQ as poison.
  • Per-topic isolation. Each Topic[T] maps to its own RabbitMQ exchange (or in-memory queue); subscriptions do not cross between topics.
  • JSON codec. The payload type T is serialized via encoding/json. A future Codec[T] hook can switch to protobuf/msgpack without changing the API.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BindTopics added in v0.24.1

func BindTopics(t Transport)

BindTopics is the boot-time hook that walks the topic registry and points every Topic[T].publisher at the live transport. Called automatically by UseInMemory / UseTransport so most user code never invokes this directly — but transport adapter packages (pubsub/rabbit, future pubsub/nats) call it from their own Use option after constructing the transport.

Also caches t as the registry's active transport so any topic registered after this point (lazy module init) gets bound on registration via register(). Without that cache, a topic declared in a package not imported until a later module load would Publish into the void.

Idempotent: calling twice (e.g. test that toggles UseInMemory across subtests) replaces the binding atomically. The previous transport's Close is the caller's responsibility.

func Subscribe

func Subscribe[T any](topic *Topic[T], name string, handler func(ctx context.Context, payload T) error, cfg SubscriptionConfig) nexus.Option

Subscribe registers a subscription against topic. Returns a nexus.Option intended to live inside a nexus.Module (or directly in nexus.Run's option list). The option resolves to an fx.Invoke that starts a long-running consumer once the app boots — same lifecycle shape as nexus.AsWorker.

Handler is called once per delivered message. On nil return → Ack. On error return → Retry with exponential backoff up to cfg.MaxRetries, then DLQ. JSON-decode failures (poison payloads) skip retries and go straight to DLQ.

var module = nexus.Module("email",
    nexus.Provide(NewEmailService),
    pubsub.Subscribe(UserCreated, "send-welcome",
        func(ctx context.Context, e UserCreatedEvent) error {
            return mailer.Send(ctx, e.UserID, welcomeTemplate)
        },
        pubsub.SubscriptionConfig{MaxRetries: 5}),
)

The subscription's worker name is `pubsub:<topic>:<subscription>`, which the dashboard surfaces in its Workers panel — operators see status (running / stopped / failed) and last error without any extra wiring.

func UseInMemory

func UseInMemory() nexus.Option

UseInMemory binds an InMemoryTransport to every registered topic. Returned option is intended for tests and `nexus dev` runs that don't have a broker available. Production code should use pubsub.UseRabbit (or similar) instead.

Wiring sequence at boot:

  1. fx constructs *InMemoryTransport (Provide).
  2. fx Provide also exposes it as Transport (the interface Subscribe's worker depends on).
  3. An Invoke walks the topic registry and calls bindTransport on each — so Topic[T].Publish has a live transport ref by the time fx.Start unblocks.
  4. fx.Stop closes the transport, which closes every queue and lets blocked Consume goroutines exit.

func UseTransport

func UseTransport(t Transport) nexus.Option

UseTransport is the escape hatch for adapters defined outside this package. The returned option provides the user-supplied transport to fx and runs BindTopics. The pubsub/rabbit subpackage uses this internally so adapter code doesn't need to duplicate the binding boilerplate.

The transport's lifecycle is the caller's responsibility — they own the fx.Lifecycle hook for OnStart/OnStop.

Types

type ConsumeConfig

type ConsumeConfig struct {
	// MaxRetries is the maximum number of redelivery attempts before
	// the dispatcher routes the message to the DLQ. Counted across
	// the whole subscription lifetime; broker-level redelivery counts
	// are not consulted (different adapters track them differently).
	MaxRetries int

	// AckDeadline is the per-message handler timeout. The dispatcher
	// derives a context from the parent that cancels at this deadline,
	// so a stuck handler doesn't tie up the consume loop forever.
	AckDeadline time.Duration

	// BackoffMin / BackoffMax bracket the exponential backoff between
	// retry attempts. The dispatcher computes
	//     delay = min(BackoffMax, BackoffMin * 2^(attempt-1))
	// before re-presenting the message. Zero values pick framework
	// defaults (100ms / 30s).
	BackoffMin time.Duration
	BackoffMax time.Duration
}

ConsumeConfig is the per-subscription tuning passed from the Subscribe option to Transport.Consume. Defaults are filled in by the Subscribe wrapper before this struct reaches the adapter.

type Deliver

type Deliver func(msg Message) Disposition

Deliver is the function the dispatcher hands to Transport.Consume. Adapters call it once per received message and react to the returned Disposition: Ack → broker-acknowledge, Retry → re-enqueue with backoff, DLQ → route to dead-letter.

type Disposition

type Disposition int

Disposition is the dispatcher's verdict for a delivered message. Adapters MUST honor the verdict — there is no "soft" Ack that the adapter can downgrade. Misbehaving adapters are caught by the in-memory test transport which asserts on every disposition.

const (
	// DispositionAck — handler succeeded, broker can drop the message.
	DispositionAck Disposition = iota
	// DispositionRetry — handler failed transiently; re-enqueue after
	// backoff and increment DeliveryAttempt. Becomes DLQ once attempts
	// reach MaxRetries.
	DispositionRetry
	// DispositionDLQ — handler indicated the message is unprocessable
	// (poison payload, business-rule rejection, attempts exhausted).
	// Adapter routes to dead-letter and does NOT redeliver.
	DispositionDLQ
)

type InMemoryTransport

type InMemoryTransport struct {
	// contains filtered or unexported fields
}

InMemoryTransport is the zero-dep test/dev transport. Behaves like a real broker for the dispatcher's purposes: durable across the process's lifetime, fan-out to multiple subscriptions on the same topic, retry with exponential backoff up to MaxRetries, then DLQ.

Not safe to share across processes — there is no broker, no socket, no persistence to disk. Useful surfaces:

  • Unit tests: NewInMemoryTransport(), publish, assert on transport.DLQ(topic, sub).
  • `nexus dev` without RabbitMQ available locally: the framework falls back to in-memory if no UseRabbit option was selected, so a developer can iterate on subscriber code without standing up a broker.

func NewInMemoryTransport

func NewInMemoryTransport() *InMemoryTransport

NewInMemoryTransport returns a fresh in-memory broker. Multiple instances are isolated — one transport's queues do not see another's publishes. The fx-provided instance is created once per app boot.

func (*InMemoryTransport) Close

func (m *InMemoryTransport) Close() error

Close marks the transport closed and closes every queue channel so any blocked Consume goroutines exit cleanly. Idempotent.

func (*InMemoryTransport) Consume

func (m *InMemoryTransport) Consume(ctx context.Context, topic, subscription string, cfg ConsumeConfig, deliver Deliver) error

Consume registers (topic, subscription) — creating the queue + DLQ slot if this is the first subscriber — and runs the deliver loop until ctx is cancelled.

On Disposition results:

  • Ack: drop the message (tests can inspect via Acked() if needed).
  • Retry: sleep for the computed backoff, increment DeliveryAttempt, re-push onto the queue. After MaxRetries hits, the next failure promotes to DLQ.
  • DLQ: record onto m.dlq[key]. Do not redeliver.

Backoff sleeps are interrupted by ctx cancellation so an app shutdown doesn't wait the full retry delay before exiting.

func (*InMemoryTransport) DLQ

func (m *InMemoryTransport) DLQ(topic, subscription string) []Message

DLQ returns a copy of the dead-letter slice for a (topic, subscription) pair. Tests use this to assert that retry-exhausted or poison messages landed where expected.

func (*InMemoryTransport) Publish

func (m *InMemoryTransport) Publish(_ context.Context, topic string, payload []byte, attrs map[string]string) error

Publish writes one copy of payload into each subscription queue bound to topic. Returns an error only if the transport is closed or if a queue is full (signaling backpressure that tests should surface rather than mask).

type Message

type Message struct {
	Topic        string
	Subscription string

	// Payload is the raw bytes from the broker. The Subscribe wrapper
	// runs Topic[T].decode before invoking the user's handler — so
	// handlers see typed T, not bytes.
	Payload []byte

	// Attrs is the optional metadata sent by the publisher (headers,
	// content-type, idempotency key). nil when none were set.
	Attrs map[string]string

	// DeliveryAttempt is 1 on first delivery, incremented on each
	// retry. The dispatcher uses this together with cfg.MaxRetries to
	// decide retry-vs-DLQ.
	DeliveryAttempt int
}

Message is one delivered envelope handed to the handler dispatcher. Adapters fill it from their wire format; the dispatcher does not care which broker produced it.

type SubscriptionConfig

type SubscriptionConfig struct {
	// MaxRetries is the number of redeliveries the dispatcher
	// attempts before promoting to DLQ. Default: 3.
	MaxRetries int

	// AckDeadline bounds how long a single handler call may run
	// before the dispatcher cancels its context. The handler can
	// still take longer — Go doesn't preempt — but the cancellation
	// signals downstream calls to abort. Default: 30s.
	AckDeadline time.Duration

	// BackoffMin / BackoffMax are the exponential-backoff envelope
	// between retry attempts. Defaults: 100ms / 30s.
	BackoffMin time.Duration
	BackoffMax time.Duration
}

SubscriptionConfig configures a subscription's retry + ack semantics. Zero values pick framework defaults — typical callers pass `pubsub.SubscriptionConfig{}` and only override when they need different behavior (e.g. high-cost handlers want fewer retries).

type SubscriptionInfo

type SubscriptionInfo struct {
	Topic         string `json:"topic"`
	Name          string `json:"name"`
	MaxRetries    int    `json:"maxRetries,omitempty"`
	AckDeadlineMs int64  `json:"ackDeadlineMs,omitempty"`
}

SubscriptionInfo is a snapshot of one registered subscription. The dispatch tuning is included so an operator can answer "why did this message DLQ" without reading source.

func Subscriptions

func Subscriptions() []SubscriptionInfo

Subscriptions returns every subscription registered via Subscribe, sorted by (topic, subscription name).

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic[T] is a typed pub/sub channel. Constructed at package-init via NewTopic; Publish is safe to call from any goroutine after the app has booted.

The zero value is not usable — callers must go through NewTopic so the registry has a record. Calling Publish on a zero Topic returns an explicit error rather than panicking, so a misconfigured app fails loudly at the first publish rather than silently dropping.

func NewTopic

func NewTopic[T any](name string, cfg TopicConfig) *Topic[T]

NewTopic registers a new topic by name and returns a typed handle. Call this at package level so the topic exists in the registry by the time nexus.Run walks it:

var UserCreated = pubsub.NewTopic[UserCreatedEvent]("user-created",
    pubsub.TopicConfig{Description: "Emitted when a user signs up"})

Duplicate names panic at init — earlier the loud failure, the less surprising the eventual production outage. The registry is process-global because topic identity is the *name*, not any per-app instance: two modules referring to "user-created" must produce the same Topic[T] value.

func (*Topic[T]) Description

func (t *Topic[T]) Description() string

func (*Topic[T]) Durable

func (t *Topic[T]) Durable() bool

func (*Topic[T]) Name

func (t *Topic[T]) Name() string

Name is the registered topic name (the same string passed to NewTopic). Stable across the lifetime of the process.

func (*Topic[T]) PayloadType

func (t *Topic[T]) PayloadType() reflect.Type

func (*Topic[T]) Publish

func (t *Topic[T]) Publish(ctx context.Context, payload T) error

Publish encodes payload and hands it to the active transport. Returns an error if no transport has been bound — typically because the app forgot to include pubsub.UseInMemory() or pubsub.UseRabbit() in its option chain. The error names the topic so the operator can trace the missing wiring without grepping.

The function is goroutine-safe and does not allocate beyond the codec's own buffer. Hot-path callers (fan-out from a single request) are expected to call this directly without batching; transport-level batching belongs inside the Transport, not here.

Trace integration: when ctx carries a trace span (typical inside a request handler), Publish emits a `pubsub.publish:<topic>` span so the publish appears as a bar on the dashboard's trace waterfall. The current span's W3C traceparent is also injected into Message.Attrs so downstream consumers can stitch their own spans into the publisher's trace once consumer-side trace propagation lands.

type TopicConfig

type TopicConfig struct {
	// Description shows up in the manifest + dashboard so operators
	// know what the topic carries without reading source.
	Description string

	// Durable, when true, instructs the transport to persist messages
	// across broker restarts. RabbitMQ: durable exchange + persistent
	// publish. In-memory: ignored (everything is in-process anyway).
	Durable bool
}

TopicConfig is the per-topic knobs. Empty values pick framework defaults at boot, so callers can pass the zero value for the common case (`pubsub.NewTopic[T]("name", pubsub.TopicConfig{})`).

type TopicInfo

type TopicInfo struct {
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`
	Durable     bool   `json:"durable,omitempty"`
	PayloadType string `json:"payloadType,omitempty"`
}

TopicInfo is a snapshot of one registered topic. Built from the internal topicRecord; PayloadType is the Go type name (e.g. "main.UserCreatedEvent") so a JSON consumer can read it without reflection.

func Topics

func Topics() []TopicInfo

Topics returns every topic registered via NewTopic, sorted by name. Safe to call from any goroutine; returns a fresh slice the caller may mutate.

type Transport

type Transport interface {
	// Publish sends payload to topic. Implementations must encode the
	// optional attrs onto the wire (RabbitMQ headers, NATS Header) so
	// subscribers see them on Message.Attrs. Returning an error fails
	// the publishing call directly — at-least-once durability is the
	// caller's responsibility (typically: retry on transient errors,
	// log + drop on permanent ones).
	Publish(ctx context.Context, topic string, payload []byte, attrs map[string]string) error

	// Consume blocks until ctx is cancelled, calling deliver for each
	// received message. The dispatcher wraps deliver with the retry +
	// DLQ logic, so adapters only need to implement the lower-level
	// "next message → ack/nack/requeue" loop.
	//
	// cfg carries the per-subscription tuning (max retries, ack
	// deadline). Adapters that don't natively support a feature
	// (e.g. in-memory has no real ack deadline) MAY ignore the
	// matching field; the dispatcher does not depend on broker-side
	// enforcement.
	Consume(ctx context.Context, topic, subscription string, cfg ConsumeConfig, deliver Deliver) error

	// Close releases adapter resources (RabbitMQ connection, in-mem
	// channels). Called from fx.Stop. Idempotent — calling on an
	// already-closed transport must not panic.
	Close() error
}

Transport is the broker-agnostic interface every adapter implements. The core pubsub package depends only on this; pubsub/rabbit and the in-memory transport satisfy it. New backends (NATS, Kafka) plug in the same way — implement Transport, expose a UseFoo() option that fx.Provide's it, and bindTopics handles the rest.

Directories

Path Synopsis
Package rabbit is the production RabbitMQ adapter for nexus's pubsub primitive.
Package rabbit is the production RabbitMQ adapter for nexus's pubsub primitive.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL