Documentation
¶
Overview ¶
Package broker abstracts an event broker behind transport-agnostic Publisher and Consumer contracts.
Messages travel the wire as CloudEvents v1.0 structured-mode JSON; this package owns that envelope so every transport (e.g. broker/rabbitmq) speaks the same shape and consumers can dedupe on the CloudEvents id. The interfaces here have no transport dependencies, so the same producer/consumer code can ship over RabbitMQ, Kafka, NATS, or a WebSocket gateway by swapping the adapter behind Publisher.
The abstraction ¶
Message carries CloudEvents context attributes (ID/Type/Source/Subject/Time/ DataContentType) plus the business payload (Data) and transport-neutral routing: Topic (the logical destination) and an optional Key (routing/ ordering key within the topic). Each transport maps Topic/Key to its own concept — a RabbitMQ exchange + routing key, a Kafka topic + partition key, a NATS subject. Headers carry transport headers such as W3C trace context.
- Publisher delivers a Message; implementations provide at-least-once semantics, so a nil error means the broker durably accepted it.
- Handler processes one received Message and returns an Action.
- Action is the consumer's verdict: Ack (drop), Requeue (redeliver), or Discard (drop without requeue / dead-letter).
CloudEvents envelope ¶
MarshalCloudEvent serializes a Message into a CloudEvents v1.0 structured-mode JSON envelope (SpecVersion is stamped automatically, ContentTypeCloudEvents is the wire content type); the business payload lands under "data". Transport fields (Topic, Key, Headers) are NOT part of the envelope — the transport applies them at publish time. UnmarshalCloudEvent reverses it, leaving the transport fields zero for the adapter to fill from the delivery.
body, err := broker.MarshalCloudEvent(broker.Message{
Type: "widget.created",
DataContentType: "application/json",
Data: []byte(`{"id":"w-1"}`),
Topic: "widgets",
})
m, err := broker.UnmarshalCloudEvent(body) // m.Topic/Key/Headers are zero
How a transport plugs in ¶
A concrete adapter implements Publisher (and, for consumers, worker.Runnable) in a sub-package. broker/rabbitmq is the reference adapter: its Publisher maps Message.Topic/Key to a RabbitMQ exchange/routing key and publishes the CloudEvents body with publisher confirms; its Consumer decodes deliveries back into Messages and dispatches them to a broker.Handler. Application and domain code depend only on broker.Publisher / broker.Handler, never on the adapter.
Index ¶
Constants ¶
const ( // SpecVersion is the CloudEvents spec version stamped on every envelope. SpecVersion = "1.0" // ContentTypeCloudEvents is the wire content type for the structured-mode // JSON envelope produced by MarshalCloudEvent. ContentTypeCloudEvents = "application/cloudevents+json" )
Variables ¶
This section is empty.
Functions ¶
func MarshalCloudEvent ¶
MarshalCloudEvent serializes m as a CloudEvents v1.0 structured-mode JSON envelope: context attributes (id/type/source/...) live at the top level and the business payload goes under "data". Transport fields (Topic, Key, Headers) are NOT part of the envelope — they are applied by the transport at publish time.
Types ¶
type Handler ¶
Handler processes one received message and returns the broker Action. It must be safe for concurrent use when the consumer runs with concurrency > 1.
type Message ¶
type Message struct {
// ID is the unique event id; consumers dedupe on it. Defaults to a fresh
// UUID at publish time when empty.
ID string
// Type is the CloudEvents type, e.g. "widget.created".
Type string
// Source identifies the producer; a publisher fills it from its configured
// source when empty.
Source string
// Subject is the optional CloudEvents subject.
Subject string
// Time is the event timestamp; defaults to now at publish time when zero.
Time time.Time
// DataContentType is the MIME type of Data, e.g. "application/json".
DataContentType string
// Data is the business payload (NOT the envelope).
Data []byte
// Topic is the logical destination, mapped by each transport to its own
// concept: a RabbitMQ exchange, a Kafka/NATS topic, a WebSocket channel.
Topic string
// Key is an optional routing/ordering key within the topic: a RabbitMQ
// routing key, a Kafka partition key, a NATS subject suffix. May be empty
// for transports that don't use one.
Key string
// Headers are transport headers (e.g. W3C trace context).
Headers map[string]string
}
Message is a broker message in CloudEvents terms plus the transport routing needed to deliver it. Data is the business payload only — the CloudEvents envelope is built around it by MarshalCloudEvent.
func UnmarshalCloudEvent ¶
UnmarshalCloudEvent parses a structured-mode JSON envelope back into a Message. Transport fields are left zero; the caller fills Headers/Exchange/ RoutingKey from the delivery if needed.
type Publisher ¶
type Publisher interface {
// Publish delivers m. Implementations should provide at-least-once
// semantics (e.g. AMQP publisher confirms) so callers can treat a nil error
// as durably accepted by the broker.
Publish(ctx context.Context, m Message) error
// Close releases publisher resources.
Close() error
}
Publisher publishes messages to the broker.