messaging

package module
v0.0.0-...-d825573 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 12 Imported by: 0

README

gokit/messaging

Transport-agnostic message producer/consumer abstraction with Kafka provider, in-memory broker for testing, and composable middleware.

Overview

The messaging module provides a unified interface for publishing and consuming messages across different transports. It defines core types (Message, Event), producer/consumer interfaces, and higher-level patterns like routing, batching, and managed consumers — all independent of any specific broker.

Concrete implementations (Kafka, in-memory) plug into these interfaces, so application code stays transport-agnostic. A middleware system allows cross-cutting concerns (retry, dead-letter, tracing, deduplication, metrics) to be composed around any handler.

Installation

go get github.com/kbukum/gokit/messaging

Quick Start

package main

import (
	"context"
	"fmt"

	"github.com/kbukum/gokit/messaging"
	"github.com/kbukum/gokit/messaging/memory"
)

func main() {
	ctx := context.Background()

	// Create an in-memory broker (great for testing)
	broker := memory.NewBroker()
	producer := broker.Producer()
	consumer := broker.Consumer("user.events")

	// Publish a structured event
	event, _ := messaging.NewEvent("user.created", "auth-service", map[string]string{
		"id": "user-123", "email": "user@example.com",
	}, "user-123")

	_ = producer.Publish(ctx, "user.events", event)

	// Consume messages
	go consumer.Consume(ctx, func(ctx context.Context, msg messaging.Message) error {
		fmt.Printf("Received: %s\n", msg.Value)
		return nil
	})
}

API Reference

Core Types
Type Description
Message Low-level broker message with key, value, topic, partition, offset, headers
Event Structured domain event with ID, type, source, timestamp, and JSON data
Producer Interface with Publish, PublishJSON, and PublishBinary methods
Consumer Interface with blocking Consume loop and Topic/Close methods
MessageHandler func(ctx, Message) error — the core handler signature
HandlerMiddleware func(MessageHandler) MessageHandler — composable middleware
ErrorClassifier Categorizes errors as connection or retryable
BrokerComponent Factory + lifecycle interface for broker implementations
Publishing Methods
  • Publish(ctx, topic, event, key...) — structured Event with headers
  • PublishJSON(ctx, topic, key, value) — direct JSON marshaling
  • PublishBinary(ctx, topic, key, data) — raw bytes (protobuf, avro)
Higher-Level Patterns
  • NewRouter() — route messages by topic with exact match, wildcard (*), and default fallback
  • NewBatchProducer(producer, topic, cfg) — buffer and flush on size, time, or byte thresholds
  • NewManagedConsumer(cfg) — background consumer lifecycle with start/stop/status
  • ChainHandlers(base, mw...) — compose handler middlewares in order

Sub-Packages

kafka/ — Kafka Implementation

Production-ready Kafka producer and consumer using segmentio/kafka-go. Supports TLS, SASL authentication, consumer groups, compression, and configurable batching.

import (
	"github.com/kbukum/gokit/messaging/kafka"
	kafkaproducer "github.com/kbukum/gokit/messaging/kafka/producer"
)

cfg := kafka.Config{
	Brokers:     []string{"localhost:9092"},
	GroupID:     "my-service",
	Compression: "snappy",
}
cfg.ApplyDefaults()

producer, _ := kafkaproducer.NewProducer(cfg, log)
defer producer.Close()

_ = producer.Publish(ctx, "events", event)
memory/ — In-Memory Broker

Channel-based broker for unit and integration tests. No external dependencies.

broker := memory.NewBroker()
producer := broker.Producer()
consumer := broker.Consumer("my-topic")

// Publish and assert
_ = producer.PublishBinary(ctx, "my-topic", "key", []byte("hello"))
memory.AssertPublished(t, broker, "my-topic", func(msg messaging.Message) bool {
	return string(msg.Value) == "hello"
})
middleware/ — Composable Middleware

Cross-cutting concerns that wrap any MessageHandler:

Middleware Description
RetryHandler Automatic retry with configurable backoff
DeadLetterProducer Route failed messages to a dead-letter topic
TracingHandler OpenTelemetry trace context propagation
CircuitBreakerHandler Circuit breaker around message processing
DedupHandler LRU-based message deduplication with TTL
InstrumentHandler OTel counters and histogram for processing metrics
handler := messaging.ChainHandlers(
	baseHandler,
	middleware.RetryHandler(retryConfig),
	middleware.TracingHandler,
	middleware.InstrumentHandler("events", "my-service"),
)
bridge/ — Provider Integration

Adapts messaging primitives to the gokit provider pattern for use in DAGs and pipelines:

  • ProducerAsSink — wraps a Producer as a provider.Sink[Message]
  • EventProducerAsSink — wraps a Producer as a provider.Sink[Event]
  • ConsumerAsStream — wraps a Consumer as a provider.Stream
testutil/ — Test Mocks

Broker-agnostic mocks for unit testing:

  • MockProducer — records published messages with error injection
  • ChannelConsumer — pre-fed message consumer for handler testing
  • Component — mock BrokerComponent for lifecycle tests

Testing

cd messaging
go test -race ./...

Contributing

Please refer to the root CONTRIBUTING.md for guidelines.

Documentation

Overview

Package messaging provides transport-agnostic message producer and consumer abstractions for event-driven architectures.

It defines shared types (Message, Event, handler functions), interfaces (Producer, Consumer, ErrorClassifier, MetricsCollector), and generic patterns (AsRunner, ManagedConsumer, provider adapters) that are independent of any specific message broker.

Router

Route incoming messages to different handlers based on topic, event type, or custom rules using Router. Supports exact match, wildcard patterns (e.g. "content.*"), and a default fallback handler.

BatchProducer

Collect messages and flush in batches via BatchProducer. Supports size-triggered, time-triggered, and byte-triggered flushing with graceful shutdown.

Sub-packages

  • messaging/kafka: Kafka implementation using segmentio/kafka-go
  • messaging/memory: In-memory broker for testing
  • messaging/middleware: Transport-agnostic middleware (retry, DLQ, tracing, metrics, dedup, circuit breaker)
  • messaging/testutil: Broker-agnostic mock producer/consumer for testing

Configuration

Kafka-specific settings are provided via kafka.Config with ApplyDefaults()/Validate().

Index

Constants

View Source
const DefaultStopTimeout = 10 * time.Second

DefaultStopTimeout bounds ManagedConsumer.Stop when the caller's ctx has no deadline. Prevents a stuck consumer from blocking shutdown forever.

Variables

View Source
var ConnectionPatterns = []string{
	"connection refused",
	"connection reset",
	"broken pipe",
	"i/o timeout",
	"no route to host",
	"network is unreachable",
	"connection closed",
	"dial tcp",
}

ConnectionPatterns contains generic connection error patterns common to most message brokers (TCP-level failures, DNS errors, etc.).

View Source
var RetryablePatterns = []string{
	"temporary",
	"request timed out",
}

RetryablePatterns contains generic retryable error patterns that are not connection-specific but typically warrant a retry.

Functions

func IsConnectionError

func IsConnectionError(err error, extra ...string) bool

IsConnectionError checks if err matches any connection pattern. Default ConnectionPatterns are always checked; additional broker-specific patterns can be appended via the variadic argument.

func IsRetryableError

func IsRetryableError(err error, extra ...string) bool

IsRetryableError checks if err should trigger a retry. Connection errors are always retryable. Additional broker-specific retryable patterns can be appended via the variadic argument.

func ParseData

func ParseData[D any](e Event) (D, error)

ParseData unmarshals the event's Data into a typed value.

Types

type BatchConfig

type BatchConfig struct {
	MaxSize  int           // Max messages per batch (default: 100).
	MaxWait  time.Duration // Max time before forced flush (default: 5s).
	MaxBytes int64         // Max total bytes per batch (0 = unlimited).
}

BatchConfig configures a BatchProducer.

type BatchProducer

type BatchProducer struct {
	// contains filtered or unexported fields
}

BatchProducer buffers messages and flushes them in batches via an underlying Producer. It is safe for concurrent use.

func NewBatchProducer

func NewBatchProducer(p Producer, topic string, cfg BatchConfig) *BatchProducer

NewBatchProducer creates a BatchProducer that publishes to topic via p.

func (*BatchProducer) Close

func (b *BatchProducer) Close(ctx context.Context) error

Close flushes remaining messages and stops the background timer.

func (*BatchProducer) Flush

func (b *BatchProducer) Flush(ctx context.Context) error

Flush forces sending all buffered messages immediately.

func (*BatchProducer) Send

func (b *BatchProducer) Send(ctx context.Context, msg Message) error

Send buffers a message. A flush is triggered automatically when MaxSize or MaxBytes limits are reached.

type BinaryHandler

type BinaryHandler func(ctx context.Context, key string, value []byte) error

BinaryHandler processes raw binary messages.

type BrokerComponent

type BrokerComponent interface {
	component.Component

	// Producer creates a producer for the given topic.
	Producer(topic string, opts ...ProducerOption) (Producer, error)

	// Consumer registers a consumer for the given topics with the provided handler.
	Consumer(topics []string, handler MessageHandler, opts ...ConsumerOption) error
}

BrokerComponent extends component.Component with producer/consumer factory methods. Implementations provide broker-specific creation logic while sharing the common lifecycle management from component.Component.

type Consumer

type Consumer interface {
	Consume(ctx context.Context, handler MessageHandler) error
	Topic() string
	Close() error
}

Consumer runs a blocking consume loop.

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption configures consumer creation.

type ConsumerProviderAdapter

type ConsumerProviderAdapter struct {
	// contains filtered or unexported fields
}

ConsumerProviderAdapter wraps a Consumer as a provider.Provider. This allows any messaging Consumer to participate in the provider framework.

func NewConsumerProviderAdapter

func NewConsumerProviderAdapter(name string, c Consumer) *ConsumerProviderAdapter

NewConsumerProviderAdapter wraps a Consumer as a provider.Provider.

func (*ConsumerProviderAdapter) IsAvailable

func (a *ConsumerProviderAdapter) IsAvailable(_ context.Context) bool

IsAvailable checks if the consumer is ready.

func (*ConsumerProviderAdapter) Name

func (a *ConsumerProviderAdapter) Name() string

Name returns the provider's unique name.

type ConsumerRunner

type ConsumerRunner interface {
	Consume(ctx context.Context) error
	Close() error
	Topic() string
}

ConsumerRunner is used by Component to manage consumer lifecycle.

func AsRunner

func AsRunner(c Consumer, h MessageHandler) ConsumerRunner

AsRunner wraps a Consumer with a MessageHandler to create a ConsumerRunner suitable for use with BrokerComponent or any component that manages consumer lifecycle via the ConsumerRunner interface.

type ErrorClassifier

type ErrorClassifier interface {
	// IsConnectionError checks if the error is a connection-level error.
	IsConnectionError(err error) bool
	// IsRetryableError determines if the error should trigger a retry.
	IsRetryableError(err error) bool
}

ErrorClassifier categorizes errors for retry/circuit-breaker decisions. Each broker implementation provides its own classification logic.

type ErrorTranslator

type ErrorTranslator interface {
	Translate(err error, topic string) *apperrors.AppError
}

ErrorTranslator converts broker-specific errors to AppError. Each broker implementation maps its native errors to appropriate error codes, HTTP statuses, and retryable flags.

type Event

type Event struct {
	ID          string          `json:"id"`
	Type        string          `json:"type"`
	Source      string          `json:"source"`
	ContentType string          `json:"content_type,omitempty"`
	Version     string          `json:"version,omitempty"`
	Timestamp   time.Time       `json:"timestamp"`
	Subject     string          `json:"subject,omitempty"`
	Data        json.RawMessage `json:"data,omitempty"`
}

Event represents a structured event for domain messaging. Data is json.RawMessage so events can be forwarded without re-marshaling.

func NewEvent

func NewEvent[D any](eventType, source string, data D, subject ...string) (Event, error)

NewEvent creates an Event with auto-generated ID and timestamp. Data is marshaled to json.RawMessage automatically. Returns an error if data cannot be marshaled to JSON.

func (Event) ToJSON

func (e Event) ToJSON() ([]byte, error)

ToJSON marshals the event to JSON.

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

EventHandler processes structured events.

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

EventPublisher is a convenience facade that wraps a Producer with a pre-configured source name. Every call to Publish or PublishKeyed automatically constructs an Event envelope (UUID, timestamp, source) so callers only provide the topic, event type, and payload.

func NewEventPublisher

func NewEventPublisher(producer Producer, source string) *EventPublisher

NewEventPublisher creates an EventPublisher.

  • producer: any Producer implementation (Kafka, in-memory, …).
  • source: the originating service name embedded in every event.

func (*EventPublisher) Producer

func (p *EventPublisher) Producer() Producer

Producer returns the underlying producer.

func (*EventPublisher) Publish

func (p *EventPublisher) Publish(ctx context.Context, topic, eventType string, data interface{}) error

Publish sends a typed payload as a domain event.

An Event envelope is built with a fresh UUID, UTC timestamp, the configured source, and data marshaled from the generic payload.

func (*EventPublisher) PublishKeyed

func (p *EventPublisher) PublishKeyed(ctx context.Context, topic, eventType string, data interface{}, key string) error

PublishKeyed sends a typed payload with an explicit partition key.

The key is set both as the Event.Subject and the Kafka partition key.

func (*EventPublisher) Source

func (p *EventPublisher) Source() string

Source returns the configured source name.

type HandlerMiddleware

type HandlerMiddleware func(MessageHandler) MessageHandler

HandlerMiddleware transforms a MessageHandler by wrapping it with additional behavior (logging, metrics, retry, tracing, etc.).

type JSONHandler

type JSONHandler[T any] func(ctx context.Context, data T) error

JSONHandler processes JSON messages with automatic unmarshalling.

type ManagedConsumer

type ManagedConsumer struct {
	// contains filtered or unexported fields
}

ManagedConsumer wraps a Consumer with background lifecycle management. It runs the consume loop in a goroutine and provides Start/Stop/IsRunning.

func NewManagedConsumer

func NewManagedConsumer(cfg ManagedConsumerConfig) *ManagedConsumer

NewManagedConsumer creates a managed consumer with lifecycle support. The consumer must already be created and configured.

func (*ManagedConsumer) IsRunning

func (m *ManagedConsumer) IsRunning() bool

IsRunning returns whether the consumer is currently running.

func (*ManagedConsumer) Start

func (m *ManagedConsumer) Start(ctx context.Context) error

Start begins consuming messages in a background goroutine.

func (*ManagedConsumer) Stop

func (m *ManagedConsumer) Stop(ctx context.Context) error

Stop gracefully stops the consumer using the supplied ctx for the wait budget. If ctx has no deadline, DefaultStopTimeout (10s) is applied as a bounded fallback so a stuck consumer cannot block shutdown forever.

A nil ctx is treated as context.Background() with the default timeout.

func (*ManagedConsumer) Topic

func (m *ManagedConsumer) Topic() string

Topic returns the topic this consumer is subscribed to.

type ManagedConsumerConfig

type ManagedConsumerConfig struct {
	Consumer Consumer
	Handler  MessageHandler
	Log      *logger.Logger
}

ManagedConsumerConfig holds configuration for creating a ManagedConsumer.

type Message

type Message struct {
	Key       string            `json:"key"`
	Value     []byte            `json:"value"`
	Topic     string            `json:"topic"`
	Partition int               `json:"partition"`
	Offset    int64             `json:"offset"`
	Timestamp time.Time         `json:"timestamp"`
	Headers   map[string]string `json:"headers,omitempty"`
}

Message represents a broker message with both binary and JSON support.

func (Message) IsJSON

func (m Message) IsJSON() bool

IsJSON checks if the message appears to be JSON.

func (Message) ToEvent

func (m Message) ToEvent() (Event, error)

ToEvent converts the message to an Event (assumes JSON content).

func (Message) UnmarshalValueJSON

func (m Message) UnmarshalValueJSON(v interface{}) error

UnmarshalValueJSON unmarshals the message value as JSON into v.

type MessageHandler

type MessageHandler func(ctx context.Context, msg Message) error

MessageHandler processes domain messages (supports both binary and JSON).

func ChainHandlers

func ChainHandlers(base MessageHandler, middlewares ...HandlerMiddleware) MessageHandler

ChainHandlers composes middlewares around a base handler. Middlewares are applied so that the first element in the slice is the outermost wrapper (executes first on the way in, last on the way out).

chain := ChainHandlers(base, logging, metrics, retry)
// execution order: logging → metrics → retry → base

type MetricsCollector

type MetricsCollector interface {
	// RecordPublish records a publish operation's outcome.
	RecordPublish(topic string, duration time.Duration, err error)
	// RecordConsume records a consume operation's outcome.
	RecordConsume(topic string, duration time.Duration, err error)
}

MetricsCollector records messaging operational metrics. Each broker implementation provides its own metrics collection.

type Producer

type Producer interface {
	Publish(ctx context.Context, topic string, event Event, key ...string) error
	PublishJSON(ctx context.Context, topic, key string, value interface{}) error
	PublishBinary(ctx context.Context, topic, key string, data []byte) error
	Close() error
}

Producer is a transport-agnostic message producer.

Three methods for three use cases:

  • Publish: structured domain events (gokit Event with headers/metadata)
  • PublishJSON: arbitrary data as JSON (direct marshal, no envelope)
  • PublishBinary: raw bytes (protobuf, avro, etc. — zero encoding overhead)

type ProducerCloser

type ProducerCloser interface {
	Close() error
}

ProducerCloser is satisfied by any producer that can be closed.

type ProducerOption

type ProducerOption func(*producerOptions)

ProducerOption configures producer creation.

type ProducerProviderAdapter

type ProducerProviderAdapter struct {
	// contains filtered or unexported fields
}

ProducerProviderAdapter wraps a Producer as a provider.Provider and provider.Sink[Message]. This allows any messaging Producer to participate in the provider framework — composable with resilience wrappers, selectable via Manager, and pipelineable.

func NewProducerProviderAdapter

func NewProducerProviderAdapter(name string, p Producer) *ProducerProviderAdapter

NewProducerProviderAdapter wraps a Producer as a provider.Provider and Sink.

func (*ProducerProviderAdapter) IsAvailable

func (a *ProducerProviderAdapter) IsAvailable(_ context.Context) bool

IsAvailable checks if the producer is ready.

func (*ProducerProviderAdapter) Name

func (a *ProducerProviderAdapter) Name() string

Name returns the provider's unique name.

func (*ProducerProviderAdapter) Producer

func (a *ProducerProviderAdapter) Producer() Producer

Producer returns the underlying Producer for direct access.

func (*ProducerProviderAdapter) Send

Send writes a domain Message to the underlying producer.

type ProducerSinkProvider

type ProducerSinkProvider struct {
	// contains filtered or unexported fields
}

ProducerSinkProvider wraps a Producer as a provider.Sink[Message]. Unlike ProducerProviderAdapter which wraps with a fixed name, this allows a custom name independent of the underlying producer.

For batch writes, use the Producer directly. The Sink adapter sends one message at a time for composability.

func NewProducerSinkProvider

func NewProducerSinkProvider(name string, p Producer) *ProducerSinkProvider

NewProducerSinkProvider wraps a Producer as a named Sink provider.

func (*ProducerSinkProvider) IsAvailable

func (p *ProducerSinkProvider) IsAvailable(_ context.Context) bool

IsAvailable checks if the producer is ready.

func (*ProducerSinkProvider) Name

func (p *ProducerSinkProvider) Name() string

Name returns the provider's unique name.

func (*ProducerSinkProvider) Producer

func (p *ProducerSinkProvider) Producer() Producer

Producer returns the underlying Producer for direct access.

func (*ProducerSinkProvider) Send

func (p *ProducerSinkProvider) Send(ctx context.Context, msg Message) error

Send writes a domain Message via the underlying producer.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router routes incoming messages to handlers based on topic or custom key. It supports exact match, wildcard patterns (e.g. "content.*"), and a default fallback handler. Router is safe for concurrent use.

func NewRouter

func NewRouter(opts ...RouterOption) *Router

NewRouter creates a new Router.

func (*Router) Default

func (r *Router) Default(handler MessageHandler) *Router

Default sets the fallback handler for messages that match no registered pattern.

func (*Router) Handle

func (r *Router) Handle(pattern string, handler MessageHandler) *Router

Handle registers a handler for the given pattern. Patterns support exact match ("content.discovered") or wildcard ("content.*") where "*" matches any suffix after the last dot.

func (*Router) Handler

func (r *Router) Handler() MessageHandler

Handler returns a MessageHandler that routes messages based on registered patterns. The routing key is the message topic by default; use WithRouterKeyFunc to override.

type RouterOption

type RouterOption func(*routerConfig)

RouterOption configures Router behavior.

func WithRouterKeyFunc

func WithRouterKeyFunc(fn func(Message) string) RouterOption

WithRouterKeyFunc overrides the default routing key extractor. By default, the message topic is used as the routing key.

Directories

Path Synopsis
Package bridge provides provider adapters that connect messaging primitives (Producer, Consumer) to the gokit provider pattern.
Package bridge provides provider adapters that connect messaging primitives (Producer, Consumer) to the gokit provider pattern.
Package kafka provides Kafka producer and consumer lifecycle management as a gokit component.
Package kafka provides Kafka producer and consumer lifecycle management as a gokit component.
consumer
Package consumer provides a Kafka consumer implementation built on franz-go.
Package consumer provides a Kafka consumer implementation built on franz-go.
producer
Package producer provides a Kafka producer implementation built on franz-go.
Package producer provides a Kafka producer implementation built on franz-go.
testutil
Package testutil provides Kafka-specific testing utilities for the messaging/kafka module.
Package testutil provides Kafka-specific testing utilities for the messaging/kafka module.
Package memory provides an in-memory messaging broker for testing.
Package memory provides an in-memory messaging broker for testing.
Package middleware provides composable middleware for message handlers.
Package middleware provides composable middleware for message handlers.
Package testutil provides broker-agnostic testing utilities for the messaging module.
Package testutil provides broker-agnostic testing utilities for the messaging module.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL