rabbit

package
v0.26.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package rabbit is the production RabbitMQ adapter for nexus's pubsub primitive. It plugs in via pubsub.Transport so user code stays unchanged when swapping from the in-memory transport to a real broker:

// production
nexus.Run(cfg, rabbit.Use(rabbit.Config{URL: "amqp://..."}), modules...)

// tests
nexus.Run(cfg, pubsub.UseInMemory(), modules...)

Topology mapping:

  • Each Topic[T] = one durable fanout exchange named after the topic. Fanout (not topic) because nexus's typed topics already carry their own routing dimension via T — RMQ-side wildcards would only confuse the contract.
  • Each Subscribe = one durable queue named "<topic>.<sub>", bound to the topic's exchange. Multiple processes binding the same queue compete for messages (work-queue semantics); each distinct queue receives a copy (fan-out semantics).
  • Each Subscribe also gets a DLQ queue "<topic>.<sub>.dlq" for poison messages and retry-exhausted deliveries. Operators drain it manually.

Retry strategy: app-level. On Disposition=Retry, the consume loop acks the original delivery, sleeps for the configured backoff, then re-publishes the body directly to the subscription's queue with an incremented x-delivery-attempt header. After MaxRetries, the next failure routes to DLQ. This mirrors the in-memory transport's behavior 1:1 — the same ConsumeConfig produces the same retry envelope across both transports.

Connection lifecycle: New dials once and reuses the connection for one publish channel + one consume channel per subscription. Reconnection is the caller's responsibility for v1 — a future iteration may add automatic dial-with-backoff once the framework has a generic supervisor pattern for fx components.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Use

func Use(cfg Config) nexus.Option

Use returns a nexus.Option that dials the broker at boot, exposes the resulting *Transport as pubsub.Transport in the fx graph, runs pubsub.BindTopics so every registered topic gets its publisher pointed at this transport, AND auto-registers a resource.NewQueue on the app's topology graph so the dashboard surfaces broker health alongside the rest of the architecture.

Wiring sequence at boot:

  1. fx Provide constructs *Transport via New(cfg). A failed dial surfaces as an fx graph error → app refuses to start. Better a fast crash at boot than a silently-broken publisher.
  2. The same provide returns the *Transport as pubsub.Transport so subscriptions resolve their dep.
  3. An Invoke calls pubsub.BindTopics(t), wiring every topic's publisher.
  4. A second Invoke calls app.Register on a resource.NewQueue whose health probe is t.Healthy. The dashboard's topology view shows the broker as a queue node turning red when the AMQP connection drops. Skipped when Config.SkipResource is true, e.g. when the caller wants to register a custom resource with richer health metadata.
  5. fx OnStop closes the transport — channels first, then connection. Calling Close on an already-closed transport is a no-op so re-running tests don't leak the assertion.

Typical use:

nexus.Run(cfg,
    rabbit.Use(rabbit.Config{URL: os.Getenv("RABBITMQ_URL")}),
    moduleA, moduleB, ...,
)

Types

type Config

type Config struct {
	// URL is the AMQP connection string. Required. Format:
	// "amqp://user:pass@host:port/vhost".
	URL string

	// Prefetch is the per-channel QoS prefetch count. Caps the
	// number of unacked messages the broker delivers before
	// throttling. Default: 16.
	Prefetch int

	// Confirms enables RabbitMQ publisher confirms — Publish blocks
	// until the broker acks the message. Slower but durable;
	// recommend on for production publishers that absolutely cannot
	// drop. Default: off (best-effort, faster).
	Confirms bool

	// ResourceName is the dashboard label for the auto-registered
	// queue resource. Default "rabbit". Set to a more specific name
	// when multiple brokers coexist in one app (rare).
	ResourceName string

	// SkipResource disables the auto-registration of a
	// resource.NewQueue on the app's topology graph. Set to true
	// when the caller wants to register their own resource (custom
	// health probe, richer details, multiple bindings) and avoid
	// the generic auto-registered one stepping on it.
	SkipResource bool
}

Config carries connection + tuning knobs. Zero-value Prefetch picks a framework default (16) — large enough to keep a single consumer busy, small enough that a slow handler doesn't pin memory under bursty publish loads.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the adapter implementing pubsub.Transport against a live RabbitMQ broker.

func New

func New(cfg Config) (*Transport, error)

New dials the broker and prepares the transport. The returned transport is ready for Publish; Consume calls lazily declare per-subscription queues + bindings on first use.

func (*Transport) Close

func (t *Transport) Close() error

Close tears down channels + connection. Idempotent so fx OnStop can call it without guarding the second-time error.

func (*Transport) Consume

func (t *Transport) Consume(ctx context.Context, topic, subscription string, cfg pubsub.ConsumeConfig, deliver pubsub.Deliver) error

Consume opens a dedicated channel for this subscription, declares the queue + DLQ, binds the queue to the topic exchange, and dispatches deliveries to the deliver func. Blocks until ctx is cancelled or the channel errors.

Why one channel per subscription: amqp091 channels carry one consumer's delivery stream; multiplexing several subscriptions on one channel forces the dispatcher to demultiplex tag-by-tag and produces flow-control surprises on noisy subscriptions. Channels are cheap.

func (*Transport) Healthy added in v0.24.2

func (t *Transport) Healthy() bool

Healthy reports whether the broker connection is alive. Returns false when the AMQP connection's I/O loop has terminated (network drop, broker crash) OR after Close() has been called. Used as the health probe for the auto-registered resource.NewQueue so the dashboard's topology view turns red the moment the broker becomes unreachable.

Cheap — IsClosed reads a single mutex-guarded bool inside amqp091. Safe to call from the dashboard's snapshot loop on every poll.

func (*Transport) Publish

func (t *Transport) Publish(ctx context.Context, topic string, payload []byte, attrs map[string]string) error

Publish serializes payload onto the topic's fanout exchange. The optional attrs map is encoded as AMQP headers, including the W3C traceparent the pubsub layer injects for trace stitching.

Messages are persistent (DeliveryMode=2) so a broker restart doesn't drop in-flight publishes from durable queues. This costs a disk fsync per message; if a workload demands raw throughput over durability, a future Config.Persistent=false toggle can be added without API churn.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL