broker

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package broker abstracts an event broker behind transport-agnostic Publisher and Consumer contracts. Messages travel the wire as CloudEvents v1.0 structured-mode JSON; this package owns that envelope so every transport (e.g. broker/rabbitmq) speaks the same shape and consumers can dedupe on the CloudEvents id.

The interfaces here have no transport dependencies. Concrete adapters live in sub-packages and implement Publisher and worker.Runnable (for consumers).

Index

Constants

View Source
const (
	// SpecVersion is the CloudEvents spec version stamped on every envelope.
	SpecVersion = "1.0"
	// ContentTypeCloudEvents is the wire content type for the structured-mode
	// JSON envelope produced by MarshalCloudEvent.
	ContentTypeCloudEvents = "application/cloudevents+json"
)

Variables

This section is empty.

Functions

func MarshalCloudEvent

func MarshalCloudEvent(m Message) ([]byte, error)

MarshalCloudEvent serializes m as a CloudEvents v1.0 structured-mode JSON envelope: context attributes (id/type/source/...) live at the top level and the business payload goes under "data". Transport fields (Exchange, RoutingKey, Headers) are NOT part of the envelope — they are applied by the transport at publish time.

Types

type Action

type Action int

Action is a consumer's verdict on a delivery, returned by a Handler.

const (
	// Ack acknowledges successful processing; the broker drops the message.
	Ack Action = iota
	// Requeue negatively acknowledges and asks the broker to redeliver later.
	Requeue
	// Discard negatively acknowledges without requeue (drop / dead-letter).
	Discard
)

type Handler

type Handler func(ctx context.Context, m Message) Action

Handler processes one received message and returns the broker Action. It must be safe for concurrent use when the consumer runs with concurrency > 1.

type Message

type Message struct {

	// ID is the unique event id; consumers dedupe on it. Defaults to a fresh
	// UUID at publish time when empty.
	ID string
	// Type is the CloudEvents type, e.g. "widget.created".
	Type string
	// Source identifies the producer; a publisher fills it from its configured
	// source when empty.
	Source string
	// Subject is the optional CloudEvents subject.
	Subject string
	// Time is the event timestamp; defaults to now at publish time when zero.
	Time time.Time
	// DataContentType is the MIME type of Data, e.g. "application/json".
	DataContentType string
	// Data is the business payload (NOT the envelope).
	Data []byte

	// Exchange is the target exchange (AMQP) or topic namespace.
	Exchange string
	// RoutingKey routes the message within the exchange.
	RoutingKey string
	// Headers are transport headers (e.g. W3C trace context).
	Headers map[string]string
}

Message is a broker message in CloudEvents terms plus the transport routing needed to deliver it. Data is the business payload only — the CloudEvents envelope is built around it by MarshalCloudEvent.

func UnmarshalCloudEvent

func UnmarshalCloudEvent(body []byte) (Message, error)

UnmarshalCloudEvent parses a structured-mode JSON envelope back into a Message. Transport fields are left zero; the caller fills Headers/Exchange/ RoutingKey from the delivery if needed.

type Publisher

type Publisher interface {
	// Publish delivers m. Implementations should provide at-least-once
	// semantics (e.g. AMQP publisher confirms) so callers can treat a nil error
	// as durably accepted by the broker.
	Publish(ctx context.Context, m Message) error
	// Close releases publisher resources.
	Close() error
}

Publisher publishes messages to the broker.

Directories

Path Synopsis
Package rabbitmq implements the broker abstraction over RabbitMQ using github.com/wagslane/go-rabbitmq, which adds automatic reconnection and publisher confirms on top of amqp091.
Package rabbitmq implements the broker abstraction over RabbitMQ using github.com/wagslane/go-rabbitmq, which adds automatic reconnection and publisher confirms on top of amqp091.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL