broker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package broker abstracts an event broker behind transport-agnostic Publisher and Consumer contracts.

Messages travel the wire as CloudEvents v1.0 structured-mode JSON; this package owns that envelope so every transport (e.g. broker/rabbitmq) speaks the same shape and consumers can dedupe on the CloudEvents id. The interfaces here have no transport dependencies, so the same producer/consumer code can ship over RabbitMQ, Kafka, NATS, or a WebSocket gateway by swapping the adapter behind Publisher.

The abstraction

Message carries CloudEvents context attributes (ID/Type/Source/Subject/Time/ DataContentType) plus the business payload (Data) and transport-neutral routing: Topic (the logical destination) and an optional Key (routing/ ordering key within the topic). Each transport maps Topic/Key to its own concept — a RabbitMQ exchange + routing key, a Kafka topic + partition key, a NATS subject. Headers carry transport headers such as W3C trace context.

  • Publisher delivers a Message; implementations provide at-least-once semantics, so a nil error means the broker durably accepted it.
  • Handler processes one received Message and returns an Action.
  • Action is the consumer's verdict: Ack (drop), Requeue (redeliver), or Discard (drop without requeue / dead-letter).

CloudEvents envelope

MarshalCloudEvent serializes a Message into a CloudEvents v1.0 structured-mode JSON envelope (SpecVersion is stamped automatically, ContentTypeCloudEvents is the wire content type); the business payload lands under "data". Transport fields (Topic, Key, Headers) are NOT part of the envelope — the transport applies them at publish time. UnmarshalCloudEvent reverses it, leaving the transport fields zero for the adapter to fill from the delivery.

body, err := broker.MarshalCloudEvent(broker.Message{
    Type:            "widget.created",
    DataContentType: "application/json",
    Data:            []byte(`{"id":"w-1"}`),
    Topic:           "widgets",
})

m, err := broker.UnmarshalCloudEvent(body) // m.Topic/Key/Headers are zero

How a transport plugs in

A concrete adapter implements Publisher (and, for consumers, worker.Runnable) in a sub-package. broker/rabbitmq is the reference adapter: its Publisher maps Message.Topic/Key to a RabbitMQ exchange/routing key and publishes the CloudEvents body with publisher confirms; its Consumer decodes deliveries back into Messages and dispatches them to a broker.Handler. Application and domain code depend only on broker.Publisher / broker.Handler, never on the adapter.

Index

Constants

View Source
const (
	// SpecVersion is the CloudEvents spec version stamped on every envelope.
	SpecVersion = "1.0"
	// ContentTypeCloudEvents is the wire content type for the structured-mode
	// JSON envelope produced by MarshalCloudEvent.
	ContentTypeCloudEvents = "application/cloudevents+json"
)

Variables

This section is empty.

Functions

func MarshalCloudEvent

func MarshalCloudEvent(m Message) ([]byte, error)

MarshalCloudEvent serializes m as a CloudEvents v1.0 structured-mode JSON envelope: context attributes (id/type/source/...) live at the top level and the business payload goes under "data". Transport fields (Topic, Key, Headers) are NOT part of the envelope — they are applied by the transport at publish time.

Types

type Action

type Action int

Action is a consumer's verdict on a delivery, returned by a Handler.

const (
	// Ack acknowledges successful processing; the broker drops the message.
	Ack Action = iota
	// Requeue negatively acknowledges and asks the broker to redeliver later.
	Requeue
	// Discard negatively acknowledges without requeue (drop / dead-letter).
	Discard
)

type Handler

type Handler func(ctx context.Context, m Message) Action

Handler processes one received message and returns the broker Action. It must be safe for concurrent use when the consumer runs with concurrency > 1.

type Message

type Message struct {

	// ID is the unique event id; consumers dedupe on it. Defaults to a fresh
	// UUID at publish time when empty.
	ID string
	// Type is the CloudEvents type, e.g. "widget.created".
	Type string
	// Source identifies the producer; a publisher fills it from its configured
	// source when empty.
	Source string
	// Subject is the optional CloudEvents subject.
	Subject string
	// Time is the event timestamp; defaults to now at publish time when zero.
	Time time.Time
	// DataContentType is the MIME type of Data, e.g. "application/json".
	DataContentType string
	// Data is the business payload (NOT the envelope).
	Data []byte

	// Topic is the logical destination, mapped by each transport to its own
	// concept: a RabbitMQ exchange, a Kafka/NATS topic, a WebSocket channel.
	Topic string
	// Key is an optional routing/ordering key within the topic: a RabbitMQ
	// routing key, a Kafka partition key, a NATS subject suffix. May be empty
	// for transports that don't use one.
	Key string
	// Headers are transport headers (e.g. W3C trace context).
	Headers map[string]string
}

Message is a broker message in CloudEvents terms plus the transport routing needed to deliver it. Data is the business payload only — the CloudEvents envelope is built around it by MarshalCloudEvent.

func UnmarshalCloudEvent

func UnmarshalCloudEvent(body []byte) (Message, error)

UnmarshalCloudEvent parses a structured-mode JSON envelope back into a Message. Transport fields are left zero; the caller fills Headers/Exchange/ RoutingKey from the delivery if needed.

type Publisher

type Publisher interface {
	// Publish delivers m. Implementations should provide at-least-once
	// semantics (e.g. AMQP publisher confirms) so callers can treat a nil error
	// as durably accepted by the broker.
	Publish(ctx context.Context, m Message) error
	// Close releases publisher resources.
	Close() error
}

Publisher publishes messages to the broker.

Directories

Path Synopsis
Package rabbitmq implements the broker abstraction over RabbitMQ.
Package rabbitmq implements the broker abstraction over RabbitMQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL