broker

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package broker defines Servora's minimal message broker abstraction. Inspired by kratos-transport/broker interface design, simplified for Servora's event-bus use case (no RPC / Binder semantics).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Connect establishes a connection to the broker backend.
	Connect(ctx context.Context) error
	// Disconnect closes the connection and releases resources.
	Disconnect(ctx context.Context) error
	// Publish sends a message to the given topic.
	Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
	// Subscribe registers a handler for messages on the given topic.
	Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
}

Broker is the top-level message broker interface: connect, publish, subscribe.

type Event

type Event interface {
	// Topic returns the topic this event was received from.
	Topic() string
	// Message returns the decoded message.
	Message() *Message
	// RawMessage returns the underlying broker-specific raw message object
	// (e.g. *kgo.Record for Kafka). Useful for advanced use cases.
	RawMessage() any
	// Ack acknowledges successful processing. The broker will not re-deliver.
	Ack() error
	// Nack signals a processing failure. The broker may re-deliver.
	Nack() error
	// Error returns any fetch-level error attached to this event.
	Error() error
}

Event wraps an incoming message and provides acknowledge semantics. Inspired by kratos-transport/broker Event interface.

type Handler

type Handler func(ctx context.Context, event Event) error

Handler is the function signature for message consumers.

func Chain

func Chain(h Handler, mws ...MiddlewareFunc) Handler

Chain applies multiple middleware functions to a handler (outermost first).

type Headers

type Headers map[string]string

Headers is a string key-value map for message metadata.

type Message

type Message struct {
	// Key is used for partition routing (e.g. entity ID).
	Key string
	// Headers carries metadata (trace IDs, content-type, schema version, etc.).
	Headers Headers
	// Body is the serialised payload (proto bytes, JSON, etc.).
	Body []byte
	// Partition is the partition this message was received from (consumer-side).
	Partition int32
	// Offset is the offset within the partition (consumer-side).
	Offset int64
}

Message is the unit of data exchanged on a topic.

func (*Message) GetHeader

func (m *Message) GetHeader(key string) string

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

MiddlewareFunc wraps a Handler, enabling middleware chains for logging, tracing, retry, etc.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption configures a single Publish call.

func WithPublishHeaders

func WithPublishHeaders(h Headers) PublishOption

WithPublishHeaders adds extra headers to a published message.

type PublishOptions

type PublishOptions struct {
	// Headers merged into the message headers (override message.Headers on conflict).
	Headers Headers
}

PublishOptions holds all per-publish settings.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption configures a Subscribe call.

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck disables automatic Ack on handler success. The handler is then responsible for calling Event.Ack() or Event.Nack().

func WithAutoAck

func WithAutoAck(v bool) SubscribeOption

WithAutoAck explicitly sets auto-ack mode (use DisableAutoAck() for the common disable case).

func WithMiddlewares

func WithMiddlewares(mws ...MiddlewareFunc) SubscribeOption

WithMiddlewares adds handler middleware functions to the subscription.

func WithQueue

func WithQueue(name string) SubscribeOption

WithQueue sets the consumer group / queue group name for competing-consumer delivery.

type SubscribeOptions

type SubscribeOptions struct {
	// AutoAck automatically calls Ack after the handler returns without error.
	// Defaults to true.
	AutoAck bool
	// Queue enables competing-consumer (queue group / consumer group) semantics.
	// Multiple subscribers with the same Queue value share the load.
	Queue string
	// Middlewares wraps the handler with a chain of MiddlewareFuncs (outermost first).
	Middlewares []MiddlewareFunc
}

SubscribeOptions holds all subscription settings.

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

NewSubscribeOptions creates SubscribeOptions with defaults applied.

type Subscriber

type Subscriber interface {
	// Topic returns the subscribed topic.
	Topic() string
	// Options returns the subscription options that were applied.
	Options() SubscribeOptions
	// Unsubscribe cancels this subscription.
	// Pass removeFromManager=true when called by user code; false when called
	// internally by broker cleanup to avoid double-locking.
	Unsubscribe(removeFromManager bool) error
}

Subscriber represents an active subscription that can be cancelled.

Directories

Path Synopsis
Package kafka provides a franz-go based implementation of pkg/broker.
Package kafka provides a franz-go based implementation of pkg/broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL