Documentation
¶
Overview ¶
Package broker defines Servora's minimal message broker abstraction. Inspired by kratos-transport/broker interface design, simplified for Servora's event-bus use case (no RPC / Binder semantics).
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface {
// Connect establishes a connection to the broker backend.
Connect(ctx context.Context) error
// Disconnect closes the connection and releases resources.
Disconnect(ctx context.Context) error
// Publish sends a message to the given topic.
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe registers a handler for messages on the given topic.
Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
}
Broker is the top-level message broker interface: connect, publish, subscribe.
type Event ¶
type Event interface {
// Topic returns the topic this event was received from.
Topic() string
// Message returns the decoded message.
Message() *Message
// RawMessage returns the underlying broker-specific raw message object
// (e.g. *kgo.Record for Kafka). Useful for advanced use cases.
RawMessage() any
// Ack acknowledges successful processing. The broker will not re-deliver.
Ack() error
// Nack signals a processing failure. The broker may re-deliver.
Nack() error
// Error returns any fetch-level error attached to this event.
Error() error
}
Event wraps an incoming message and provides acknowledge semantics. Inspired by kratos-transport/broker Event interface.
type Handler ¶
Handler is the function signature for message consumers.
func Chain ¶
func Chain(h Handler, mws ...MiddlewareFunc) Handler
Chain applies multiple middleware functions to a handler (outermost first).
type Message ¶
type Message struct {
// Key is used for partition routing (e.g. entity ID).
Key string
// Headers carries metadata (trace IDs, content-type, schema version, etc.).
Headers Headers
// Body is the serialised payload (proto bytes, JSON, etc.).
Body []byte
// Partition is the partition this message was received from (consumer-side).
Partition int32
// Offset is the offset within the partition (consumer-side).
Offset int64
}
Message is the unit of data exchanged on a topic.
type MiddlewareFunc ¶
MiddlewareFunc wraps a Handler, enabling middleware chains for logging, tracing, retry, etc.
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption configures a single Publish call.
func WithPublishHeaders ¶
func WithPublishHeaders(h Headers) PublishOption
WithPublishHeaders adds extra headers to a published message.
type PublishOptions ¶
type PublishOptions struct {
// Headers merged into the message headers (override message.Headers on conflict).
Headers Headers
}
PublishOptions holds all per-publish settings.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption configures a Subscribe call.
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck disables automatic Ack on handler success. The handler is then responsible for calling Event.Ack() or Event.Nack().
func WithAutoAck ¶
func WithAutoAck(v bool) SubscribeOption
WithAutoAck explicitly sets auto-ack mode (use DisableAutoAck() for the common disable case).
func WithMiddlewares ¶
func WithMiddlewares(mws ...MiddlewareFunc) SubscribeOption
WithMiddlewares adds handler middleware functions to the subscription.
func WithQueue ¶
func WithQueue(name string) SubscribeOption
WithQueue sets the consumer group / queue group name for competing-consumer delivery.
type SubscribeOptions ¶
type SubscribeOptions struct {
// AutoAck automatically calls Ack after the handler returns without error.
// Defaults to true.
AutoAck bool
// Queue enables competing-consumer (queue group / consumer group) semantics.
// Multiple subscribers with the same Queue value share the load.
Queue string
// Middlewares wraps the handler with a chain of MiddlewareFuncs (outermost first).
Middlewares []MiddlewareFunc
}
SubscribeOptions holds all subscription settings.
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
NewSubscribeOptions creates SubscribeOptions with defaults applied.
type Subscriber ¶
type Subscriber interface {
// Topic returns the subscribed topic.
Topic() string
// Options returns the subscription options that were applied.
Options() SubscribeOptions
// Unsubscribe cancels this subscription.
// Pass removeFromManager=true when called by user code; false when called
// internally by broker cleanup to avoid double-locking.
Unsubscribe(removeFromManager bool) error
}
Subscriber represents an active subscription that can be cancelled.