Documentation
¶
Overview ¶
Package messaging provides transport-agnostic message producer and consumer abstractions for event-driven architectures.
It defines shared types (Message, Event, handler functions), interfaces (Producer, Consumer, ErrorClassifier, MetricsCollector), and generic patterns (AsRunner, ManagedConsumer, provider adapters) that are independent of any specific message broker.
Router ¶
Route incoming messages to different handlers based on topic, event type, or custom rules using Router. Supports exact match, wildcard patterns (e.g. "content.*"), and a default fallback handler.
BatchProducer ¶
Collect messages and flush in batches via BatchProducer. Supports size-triggered, time-triggered, and byte-triggered flushing with graceful shutdown.
Sub-packages ¶
- messaging/kafka: Kafka implementation using segmentio/kafka-go
- messaging/memory: In-memory broker for testing
- messaging/middleware: Transport-agnostic middleware (retry, DLQ, tracing, metrics, dedup, circuit breaker)
- messaging/testutil: Broker-agnostic mock producer/consumer for testing
Configuration ¶
Kafka-specific settings are provided via kafka.Config with ApplyDefaults()/Validate().
Index ¶
- Constants
- Variables
- func IsConnectionError(err error, extra ...string) bool
- func IsRetryableError(err error, extra ...string) bool
- func ParseData[D any](e Event) (D, error)
- type BatchConfig
- type BatchProducer
- type BinaryHandler
- type BrokerComponent
- type Consumer
- type ConsumerOption
- type ConsumerProviderAdapter
- type ConsumerRunner
- type ErrorClassifier
- type ErrorTranslator
- type Event
- type EventHandler
- type EventPublisher
- func (p *EventPublisher) Producer() Producer
- func (p *EventPublisher) Publish(ctx context.Context, topic, eventType string, data interface{}) error
- func (p *EventPublisher) PublishKeyed(ctx context.Context, topic, eventType string, data interface{}, key string) error
- func (p *EventPublisher) Source() string
- type HandlerMiddleware
- type JSONHandler
- type ManagedConsumer
- type ManagedConsumerConfig
- type Message
- type MessageHandler
- type MetricsCollector
- type Producer
- type ProducerCloser
- type ProducerOption
- type ProducerProviderAdapter
- type ProducerSinkProvider
- type Router
- type RouterOption
Constants ¶
const DefaultStopTimeout = 10 * time.Second
DefaultStopTimeout bounds ManagedConsumer.Stop when the caller's ctx has no deadline. Prevents a stuck consumer from blocking shutdown forever.
Variables ¶
var ConnectionPatterns = []string{
"connection refused",
"connection reset",
"broken pipe",
"i/o timeout",
"no route to host",
"network is unreachable",
"connection closed",
"dial tcp",
}
ConnectionPatterns contains generic connection error patterns common to most message brokers (TCP-level failures, DNS errors, etc.).
var RetryablePatterns = []string{
"temporary",
"request timed out",
}
RetryablePatterns contains generic retryable error patterns that are not connection-specific but typically warrant a retry.
Functions ¶
func IsConnectionError ¶
IsConnectionError checks if err matches any connection pattern. Default ConnectionPatterns are always checked; additional broker-specific patterns can be appended via the variadic argument.
func IsRetryableError ¶
IsRetryableError checks if err should trigger a retry. Connection errors are always retryable. Additional broker-specific retryable patterns can be appended via the variadic argument.
Types ¶
type BatchConfig ¶
type BatchConfig struct {
MaxSize int // Max messages per batch (default: 100).
MaxWait time.Duration // Max time before forced flush (default: 5s).
MaxBytes int64 // Max total bytes per batch (0 = unlimited).
}
BatchConfig configures a BatchProducer.
type BatchProducer ¶
type BatchProducer struct {
// contains filtered or unexported fields
}
BatchProducer buffers messages and flushes them in batches via an underlying Producer. It is safe for concurrent use.
func NewBatchProducer ¶
func NewBatchProducer(p Producer, topic string, cfg BatchConfig) *BatchProducer
NewBatchProducer creates a BatchProducer that publishes to topic via p.
func (*BatchProducer) Close ¶
func (b *BatchProducer) Close(ctx context.Context) error
Close flushes remaining messages and stops the background timer.
type BinaryHandler ¶
BinaryHandler processes raw binary messages.
type BrokerComponent ¶
type BrokerComponent interface {
component.Component
// Producer creates a producer for the given topic.
Producer(topic string, opts ...ProducerOption) (Producer, error)
// Consumer registers a consumer for the given topics with the provided handler.
Consumer(topics []string, handler MessageHandler, opts ...ConsumerOption) error
}
BrokerComponent extends component.Component with producer/consumer factory methods. Implementations provide broker-specific creation logic while sharing the common lifecycle management from component.Component.
type Consumer ¶
type Consumer interface {
Consume(ctx context.Context, handler MessageHandler) error
Topic() string
Close() error
}
Consumer runs a blocking consume loop.
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption configures consumer creation.
type ConsumerProviderAdapter ¶
type ConsumerProviderAdapter struct {
// contains filtered or unexported fields
}
ConsumerProviderAdapter wraps a Consumer as a provider.Provider. This allows any messaging Consumer to participate in the provider framework.
func NewConsumerProviderAdapter ¶
func NewConsumerProviderAdapter(name string, c Consumer) *ConsumerProviderAdapter
NewConsumerProviderAdapter wraps a Consumer as a provider.Provider.
func (*ConsumerProviderAdapter) IsAvailable ¶
func (a *ConsumerProviderAdapter) IsAvailable(_ context.Context) bool
IsAvailable checks if the consumer is ready.
func (*ConsumerProviderAdapter) Name ¶
func (a *ConsumerProviderAdapter) Name() string
Name returns the provider's unique name.
type ConsumerRunner ¶
ConsumerRunner is used by Component to manage consumer lifecycle.
func AsRunner ¶
func AsRunner(c Consumer, h MessageHandler) ConsumerRunner
AsRunner wraps a Consumer with a MessageHandler to create a ConsumerRunner suitable for use with BrokerComponent or any component that manages consumer lifecycle via the ConsumerRunner interface.
type ErrorClassifier ¶
type ErrorClassifier interface {
// IsConnectionError checks if the error is a connection-level error.
IsConnectionError(err error) bool
// IsRetryableError determines if the error should trigger a retry.
IsRetryableError(err error) bool
}
ErrorClassifier categorizes errors for retry/circuit-breaker decisions. Each broker implementation provides its own classification logic.
type ErrorTranslator ¶
ErrorTranslator converts broker-specific errors to AppError. Each broker implementation maps its native errors to appropriate error codes, HTTP statuses, and retryable flags.
type Event ¶
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
ContentType string `json:"content_type,omitempty"`
Version string `json:"version,omitempty"`
Timestamp time.Time `json:"timestamp"`
Subject string `json:"subject,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
Event represents a structured event for domain messaging. Data is json.RawMessage so events can be forwarded without re-marshaling.
type EventHandler ¶
EventHandler processes structured events.
type EventPublisher ¶
type EventPublisher struct {
// contains filtered or unexported fields
}
EventPublisher is a convenience facade that wraps a Producer with a pre-configured source name. Every call to Publish or PublishKeyed automatically constructs an Event envelope (UUID, timestamp, source) so callers only provide the topic, event type, and payload.
func NewEventPublisher ¶
func NewEventPublisher(producer Producer, source string) *EventPublisher
NewEventPublisher creates an EventPublisher.
- producer: any Producer implementation (Kafka, in-memory, …).
- source: the originating service name embedded in every event.
func (*EventPublisher) Producer ¶
func (p *EventPublisher) Producer() Producer
Producer returns the underlying producer.
func (*EventPublisher) Publish ¶
func (p *EventPublisher) Publish(ctx context.Context, topic, eventType string, data interface{}) error
Publish sends a typed payload as a domain event.
An Event envelope is built with a fresh UUID, UTC timestamp, the configured source, and data marshaled from the generic payload.
func (*EventPublisher) PublishKeyed ¶
func (p *EventPublisher) PublishKeyed(ctx context.Context, topic, eventType string, data interface{}, key string) error
PublishKeyed sends a typed payload with an explicit partition key.
The key is set both as the Event.Subject and the Kafka partition key.
func (*EventPublisher) Source ¶
func (p *EventPublisher) Source() string
Source returns the configured source name.
type HandlerMiddleware ¶
type HandlerMiddleware func(MessageHandler) MessageHandler
HandlerMiddleware transforms a MessageHandler by wrapping it with additional behavior (logging, metrics, retry, tracing, etc.).
type JSONHandler ¶
JSONHandler processes JSON messages with automatic unmarshalling.
type ManagedConsumer ¶
type ManagedConsumer struct {
// contains filtered or unexported fields
}
ManagedConsumer wraps a Consumer with background lifecycle management. It runs the consume loop in a goroutine and provides Start/Stop/IsRunning.
func NewManagedConsumer ¶
func NewManagedConsumer(cfg ManagedConsumerConfig) *ManagedConsumer
NewManagedConsumer creates a managed consumer with lifecycle support. The consumer must already be created and configured.
func (*ManagedConsumer) IsRunning ¶
func (m *ManagedConsumer) IsRunning() bool
IsRunning returns whether the consumer is currently running.
func (*ManagedConsumer) Start ¶
func (m *ManagedConsumer) Start(ctx context.Context) error
Start begins consuming messages in a background goroutine.
func (*ManagedConsumer) Stop ¶
func (m *ManagedConsumer) Stop(ctx context.Context) error
Stop gracefully stops the consumer using the supplied ctx for the wait budget. If ctx has no deadline, DefaultStopTimeout (10s) is applied as a bounded fallback so a stuck consumer cannot block shutdown forever.
A nil ctx is treated as context.Background() with the default timeout.
func (*ManagedConsumer) Topic ¶
func (m *ManagedConsumer) Topic() string
Topic returns the topic this consumer is subscribed to.
type ManagedConsumerConfig ¶
type ManagedConsumerConfig struct {
Consumer Consumer
Handler MessageHandler
Log *logger.Logger
}
ManagedConsumerConfig holds configuration for creating a ManagedConsumer.
type Message ¶
type Message struct {
Key string `json:"key"`
Value []byte `json:"value"`
Topic string `json:"topic"`
Partition int `json:"partition"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
Headers map[string]string `json:"headers,omitempty"`
}
Message represents a broker message with both binary and JSON support.
func (Message) UnmarshalValueJSON ¶
UnmarshalValueJSON unmarshals the message value as JSON into v.
type MessageHandler ¶
MessageHandler processes domain messages (supports both binary and JSON).
func ChainHandlers ¶
func ChainHandlers(base MessageHandler, middlewares ...HandlerMiddleware) MessageHandler
ChainHandlers composes middlewares around a base handler. Middlewares are applied so that the first element in the slice is the outermost wrapper (executes first on the way in, last on the way out).
chain := ChainHandlers(base, logging, metrics, retry) // execution order: logging → metrics → retry → base
type MetricsCollector ¶
type MetricsCollector interface {
// RecordPublish records a publish operation's outcome.
RecordPublish(topic string, duration time.Duration, err error)
// RecordConsume records a consume operation's outcome.
RecordConsume(topic string, duration time.Duration, err error)
}
MetricsCollector records messaging operational metrics. Each broker implementation provides its own metrics collection.
type Producer ¶
type Producer interface {
Publish(ctx context.Context, topic string, event Event, key ...string) error
PublishJSON(ctx context.Context, topic, key string, value interface{}) error
PublishBinary(ctx context.Context, topic, key string, data []byte) error
Close() error
}
Producer is a transport-agnostic message producer.
Three methods for three use cases:
- Publish: structured domain events (gokit Event with headers/metadata)
- PublishJSON: arbitrary data as JSON (direct marshal, no envelope)
- PublishBinary: raw bytes (protobuf, avro, etc. — zero encoding overhead)
type ProducerCloser ¶
type ProducerCloser interface {
Close() error
}
ProducerCloser is satisfied by any producer that can be closed.
type ProducerOption ¶
type ProducerOption func(*producerOptions)
ProducerOption configures producer creation.
type ProducerProviderAdapter ¶
type ProducerProviderAdapter struct {
// contains filtered or unexported fields
}
ProducerProviderAdapter wraps a Producer as a provider.Provider and provider.Sink[Message]. This allows any messaging Producer to participate in the provider framework — composable with resilience wrappers, selectable via Manager, and pipelineable.
func NewProducerProviderAdapter ¶
func NewProducerProviderAdapter(name string, p Producer) *ProducerProviderAdapter
NewProducerProviderAdapter wraps a Producer as a provider.Provider and Sink.
func (*ProducerProviderAdapter) IsAvailable ¶
func (a *ProducerProviderAdapter) IsAvailable(_ context.Context) bool
IsAvailable checks if the producer is ready.
func (*ProducerProviderAdapter) Name ¶
func (a *ProducerProviderAdapter) Name() string
Name returns the provider's unique name.
func (*ProducerProviderAdapter) Producer ¶
func (a *ProducerProviderAdapter) Producer() Producer
Producer returns the underlying Producer for direct access.
type ProducerSinkProvider ¶
type ProducerSinkProvider struct {
// contains filtered or unexported fields
}
ProducerSinkProvider wraps a Producer as a provider.Sink[Message]. Unlike ProducerProviderAdapter which wraps with a fixed name, this allows a custom name independent of the underlying producer.
For batch writes, use the Producer directly. The Sink adapter sends one message at a time for composability.
func NewProducerSinkProvider ¶
func NewProducerSinkProvider(name string, p Producer) *ProducerSinkProvider
NewProducerSinkProvider wraps a Producer as a named Sink provider.
func (*ProducerSinkProvider) IsAvailable ¶
func (p *ProducerSinkProvider) IsAvailable(_ context.Context) bool
IsAvailable checks if the producer is ready.
func (*ProducerSinkProvider) Name ¶
func (p *ProducerSinkProvider) Name() string
Name returns the provider's unique name.
func (*ProducerSinkProvider) Producer ¶
func (p *ProducerSinkProvider) Producer() Producer
Producer returns the underlying Producer for direct access.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router routes incoming messages to handlers based on topic or custom key. It supports exact match, wildcard patterns (e.g. "content.*"), and a default fallback handler. Router is safe for concurrent use.
func (*Router) Default ¶
func (r *Router) Default(handler MessageHandler) *Router
Default sets the fallback handler for messages that match no registered pattern.
func (*Router) Handle ¶
func (r *Router) Handle(pattern string, handler MessageHandler) *Router
Handle registers a handler for the given pattern. Patterns support exact match ("content.discovered") or wildcard ("content.*") where "*" matches any suffix after the last dot.
func (*Router) Handler ¶
func (r *Router) Handler() MessageHandler
Handler returns a MessageHandler that routes messages based on registered patterns. The routing key is the message topic by default; use WithRouterKeyFunc to override.
type RouterOption ¶
type RouterOption func(*routerConfig)
RouterOption configures Router behavior.
func WithRouterKeyFunc ¶
func WithRouterKeyFunc(fn func(Message) string) RouterOption
WithRouterKeyFunc overrides the default routing key extractor. By default, the message topic is used as the routing key.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bridge provides provider adapters that connect messaging primitives (Producer, Consumer) to the gokit provider pattern.
|
Package bridge provides provider adapters that connect messaging primitives (Producer, Consumer) to the gokit provider pattern. |
|
Package kafka provides Kafka producer and consumer lifecycle management as a gokit component.
|
Package kafka provides Kafka producer and consumer lifecycle management as a gokit component. |
|
consumer
Package consumer provides a Kafka consumer implementation built on franz-go.
|
Package consumer provides a Kafka consumer implementation built on franz-go. |
|
producer
Package producer provides a Kafka producer implementation built on franz-go.
|
Package producer provides a Kafka producer implementation built on franz-go. |
|
testutil
Package testutil provides Kafka-specific testing utilities for the messaging/kafka module.
|
Package testutil provides Kafka-specific testing utilities for the messaging/kafka module. |
|
Package memory provides an in-memory messaging broker for testing.
|
Package memory provides an in-memory messaging broker for testing. |
|
Package middleware provides composable middleware for message handlers.
|
Package middleware provides composable middleware for message handlers. |
|
Package testutil provides broker-agnostic testing utilities for the messaging module.
|
Package testutil provides broker-agnostic testing utilities for the messaging module. |