protoflow

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MIT Imports: 30 Imported by: 0

README

Protoflow

Protoflow is a thin productivity layer on top of Watermill that helps you build protobuf or JSON event-driven services that run on Kafka, RabbitMQ, or AWS SNS/SQS. It wires the router, publisher, subscriber, default middleware stack, and typed handler helpers so you can focus on your domain logic instead of plumbing.

Features

  • Strongly typed handler registrations for protobuf (RegisterProtoHandler) and JSON (RegisterJSONHandler) payloads.
  • Built-in router wiring for Kafka, RabbitMQ, and AWS SNS/SQS transports selected via configuration.
  • Default middleware chain that injects correlation IDs, logs payloads, validates protobufs, stores outgoing messages in an outbox, traces with OpenTelemetry, retries with backoff, and routes poison messages.
  • Extension points for plugging in your own ProtoValidator, OutboxStore, and custom middleware registrations.
  • Helper utilities for publishing protobuf events (PublishProto/Service.PublishProto) and cloning metadata safely.

Installation

go get github.com/drblury/protoflow

Go 1.25+ is recommended because the module itself targets Go 1.25.4 in go.mod.

Examples

Each directory under examples/ is a runnable scenario that you can execute with go run ./examples/<name>:

  • examples/simple registers an untyped handler via RegisterMessageHandler and works directly with Watermill messages.
  • examples/json wires up a typed JSON handler that forwards enriched metadata.
  • examples/proto showcases protobuf handlers backed by the generated files in models/.
  • examples/full demonstrates protobuf, JSON, and raw handlers alongside custom middleware, a validator, an in-memory outbox, and a periodic publisher.

Use these as blueprints and adjust the hardcoded broker configuration to match your environment.

Usage

Registering JSON handlers

If your payloads are JSON instead of protobuf, use RegisterJSONHandler:

err := protoflow.RegisterJSONHandler(svc, protoflow.JSONHandlerRegistration[*IncomingOrder, *OutgoingOrder]{
    Name:               "json-order-handler",
    ConsumeQueue:       "orders.json",
    PublishQueue:       "orders.json.out",
    Handler: func(ctx context.Context, evt protoflow.JSONMessageContext[*IncomingOrder]) ([]protoflow.JSONMessageOutput[*OutgoingOrder], error) {
        response := &OutgoingOrder{ID: evt.Payload.ID}
        return []protoflow.JSONMessageOutput[*OutgoingOrder]{{
            Message:  response,
            Metadata: evt.Metadata.With("processed_by", "json-handler"),
        }}, nil
    },
})
Producing events

Use the helper to publish protobuf messages outside of handlers (for example, from HTTP endpoints):

metadata := protoflow.Metadata{"event_source": "api"}
if err := svc.PublishProto(ctx, "orders.created", &models.OrderCreated{OrderId: "123"}, metadata); err != nil {
    logger.Error("publish failed", "err", err)
}
Logging

NewService expects a ServiceLogger. You can obtain one by wrapping:

  • a standard library slog.Logger via protoflow.NewSlogServiceLogger
  • any Watermill LoggerAdapter via protoflow.NewWatermillServiceLogger
  • entry-style loggers (for example loggers that expose WithField/WithError chains) via protoflow.NewEntryServiceLogger
entry := customLogger.WithContext(ctx) // implements the Entry-style API
svc := protoflow.NewService(cfg, protoflow.NewEntryServiceLogger(entry), ctx, protoflow.ServiceDependencies{})

This lets consumers plug Protoflow into existing logging stacks without having to retool to slog or Watermill-specific adapters.

Configuration reference

Config selects the transport and holds per-transport settings:

cfg := &protoflow.Config{
    PubSubSystem:       "kafka",            // or "rabbitmq" / "aws"
    KafkaBrokers:       []string{"broker"},
    KafkaConsumerGroup: "group",
    RabbitMQURL:        "amqp://guest:guest@localhost",
    AWSRegion:          "eu-west-1",
    AWSAccountID:       "123456789012",
    AWSEndpoint:        "http://localhost:4566", // optional (LocalStack)
    PoisonQueue:        "events.poison",
    RetryMaxRetries:    5,
}

Only the fields required by the selected PubSubSystem are used. The retry-related settings feed into the default retry middleware.

Service dependencies and middleware

ServiceDependencies lets you inject optional collaborators:

  • Validator (ProtoValidator) validates protobuf payloads in the ProtoValidateMiddleware and optionally outgoing events. Implementations just need to provide a Validate(value any) error method (wrapping protovalidate.Validator, go-playground/validator, etc.).
  • Outbox (OutboxStore) stores emitted events before they are forwarded.
  • Middlewares ([]MiddlewareRegistration) are appended after the default chain.
  • DisableDefaultMiddlewares skips the built-in middleware stack so you can assemble your own.

The default middleware order is:

  1. Correlation ID
  2. Message logger
  3. Proto validation
  4. Outbox persistence
  5. OpenTelemetry tracer
  6. Retry with exponential backoff
  7. Poison queue forwarding
  8. Panic recoverer

You can register additional middleware by supplying ServiceDependencies.Middlewares or by calling Service.RegisterMiddleware manually.

Local development tips

  • AWS SNS/SQS: set Config.AWSEndpoint to your LocalStack URL to reuse the same code locally.
  • Kafka: ensure KafkaConsumerGroup is unique per service instance.
  • RabbitMQ: the connection is reused for both publisher and subscriber; supply TLS information via amqp.ConnectionConfig if needed.

Testing

The repository comes with unit tests that exercise service wiring, middleware, and handler helpers. Run them with:

go test ./...

Contributing

  1. Fork the repo and create a feature branch.
  2. Run go test ./... before opening a PR.
  3. Keep the README and package docs up to date when you add new features.

License

Protoflow is available under the MIT License.

Documentation

Overview

Package protoflow exposes a small framework on top of Watermill that wires message routers, publishers, subscribers, and middleware for protobuf and JSON driven services. It selects the desired Pub/Sub transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers a middleware chain that adds correlation IDs, logging, protobuf validation, outbox persistence, tracing, retries, and poison queue forwarding.

The Service type hosts the router and offers helpers for registering typed handlers via RegisterProtoHandler or RegisterJSONHandler. Typed registrations automatically marshal/unmarshal payloads, clone metadata safely, and can validate outgoing protobuf events when a ProtoValidator is provided. Service also exposes PublishProto so your HTTP or RPC handlers can emit events without touching the lower-level Watermill APIs.

Advanced users can extend the default middleware chain, plug in an OutboxStore to persist outgoing messages, or provide a ProtoValidator to enforce message contracts. See README.md for a full walkthrough.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServiceRequired             = errors.New("protoflow: event service is required")
	ErrHandlerRequired             = errors.New("protoflow: handler function is required")
	ErrConsumeQueueRequired        = errors.New("protoflow: consume queue is required")
	ErrHandlerNameRequired         = errors.New("protoflow: handler name is required")
	ErrConsumeMessageTypeRequired  = errors.New("protoflow: consume message type is required")
	ErrConsumeMessagePointerNeeded = errors.New("protoflow: consume message type must be a pointer")
	ErrPublisherRequired           = errors.New("protoflow: publisher is required")
	ErrTopicRequired               = errors.New("protoflow: topic is required")
)

Functions

func CreateULID

func CreateULID() string

CreateULID returns a time-sortable ULID encoded as a 26-character string.

func Decode

func Decode(r io.Reader, v any) error

func Encode

func Encode(w io.Writer, v any) error

func Marshal

func Marshal(v any) ([]byte, error)

func MarshalIndent

func MarshalIndent(v any, prefix, indent string) ([]byte, error)

func MustProtoMessage added in v0.2.3

func MustProtoMessage[T proto.Message]() T

MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.

func NewMessageFromProto

func NewMessageFromProto(event proto.Message, metadata Metadata) (*message.Message, error)

NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.

func NewProtoMessage added in v0.2.3

func NewProtoMessage[T proto.Message]() (T, error)

NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.

func PublishProto

func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata Metadata) error

PublishProto marshals the proto payload and publishes it to the provided topic.

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error

RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.

func RegisterMessageHandler added in v0.2.0

func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error

RegisterMessageHandler attaches the provided handler to the service router.

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error

RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.

func Unmarshal

func Unmarshal(data []byte, v any) error

Types

type Config

type Config struct {
	// PubSubSystem selects the backing message infrastructure. Supported
	// values: "kafka", "rabbitmq", or "aws" (SNS/SQS).
	PubSubSystem string

	// Kafka configuration.
	KafkaBrokers       []string
	KafkaClientID      string
	KafkaConsumerGroup string

	// RabbitMQ configuration.
	RabbitMQURL string

	// PoisonQueue receives messages that cannot be processed even after
	// retries.
	PoisonQueue string

	// AWS (SNS/SQS) configuration.
	AWSRegion          string
	AWSAccountID       string
	AWSAccessKeyID     string
	AWSSecretAccessKey string
	// AWSEndpoint optionally points to a custom endpoint (for example,
	// Localstack in local development).
	AWSEndpoint string

	// RetryMiddleware tuning. Zero values fall back to library defaults.
	RetryMaxRetries      int
	RetryInitialInterval time.Duration
	RetryMaxInterval     time.Duration
}

Config groups the Pub/Sub settings required to initialise the Service. Each transport only uses the keys that are relevant to it.

type EntryLogger added in v0.2.1

type EntryLogger interface {
	EntryLoggerAdapter[EntryLogger]
}

EntryLogger represents the legacy non-generic entry adapter constraint. It remains exported so existing code that referenced protoflow.EntryLogger keeps compiling, but NewEntryServiceLogger now works with any logger that satisfies EntryLoggerAdapter[T].

type EntryLoggerAdapter added in v0.2.2

type EntryLoggerAdapter[T any] interface {
	Error(args ...any)
	Info(args ...any)
	Debug(args ...any)
	Trace(args ...any)
	WithError(err error) T
	WithField(key string, value any) T
}

EntryLoggerAdapter captures the capabilities required by NewEntryServiceLogger. The constraint is generic so third-party entry-like loggers (for example, loggers whose methods return their own concrete interface type) can be used without additional wrappers.

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      JSONMessageHandler[T, O]
}

JSONHandlerRegistration wires a typed JSON handler to the router.

type JSONMessageContext

type JSONMessageContext[T any] struct {
	Payload  T
	Metadata Metadata
}

JSONMessageContext exposes the incoming payload and metadata for JSON handlers.

func (JSONMessageContext[T]) CloneMetadata

func (c JSONMessageContext[T]) CloneMetadata() Metadata

CloneMetadata copies the current metadata map so handlers can mutate headers safely.

type JSONMessageHandler

type JSONMessageHandler[T any, O any] func(ctx context.Context, event JSONMessageContext[T]) ([]JSONMessageOutput[O], error)

JSONMessageHandler processes a JSON payload and returns the events to publish.

type JSONMessageOutput

type JSONMessageOutput[T any] struct {
	Message  T
	Metadata Metadata
}

JSONMessageOutput represents an event emitted by a JSON handler.

type LogFields added in v0.2.0

type LogFields map[string]any

LogFields represents structured logging key/value pairs used by Protoflow.

type MessageHandlerRegistration added in v0.2.0

type MessageHandlerRegistration struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      message.HandlerFunc
	Subscriber   message.Subscriber
	Publisher    message.Publisher
}

MessageHandlerRegistration wires a raw Watermill handler without typed helpers.

type Metadata

type Metadata map[string]string

Metadata represents the headers carried alongside an event.

func NewMetadata added in v0.2.3

func NewMetadata(pairs ...string) Metadata

NewMetadata constructs a Metadata map from alternating key/value pairs.

func (Metadata) With added in v0.2.3

func (m Metadata) With(key, value string) Metadata

With returns a cloned metadata map containing the provided key/value pair.

func (Metadata) WithAll added in v0.2.3

func (m Metadata) WithAll(entries Metadata) Metadata

WithAll returns a cloned metadata map containing the supplied entries.

type MiddlewareBuilder

type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)

MiddlewareBuilder constructs a handler middleware using the provided service instance.

type MiddlewareRegistration

type MiddlewareRegistration struct {
	Name       string
	Middleware message.HandlerMiddleware
	Builder    MiddlewareBuilder
}

MiddlewareRegistration captures how a middleware should be registered on a Service router.

func CorrelationIDMiddleware

func CorrelationIDMiddleware() MiddlewareRegistration

CorrelationIDMiddleware ensures each processed message carries a correlation identifier.

func DefaultMiddlewares

func DefaultMiddlewares() []MiddlewareRegistration

DefaultMiddlewares returns the standard middleware chain used by the Service constructor.

func LogMessagesMiddleware

func LogMessagesMiddleware(logger watermill.LoggerAdapter) MiddlewareRegistration

LogMessagesMiddleware logs the full payload and metadata of handled messages.

func OutboxMiddleware

func OutboxMiddleware() MiddlewareRegistration

OutboxMiddleware persists outgoing messages when an OutboxStore is configured.

func PoisonQueueMiddleware

func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration

PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.

func ProtoValidateMiddleware

func ProtoValidateMiddleware() MiddlewareRegistration

ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.

func RecovererMiddleware

func RecovererMiddleware() MiddlewareRegistration

RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.

func RetryMiddleware

func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration

RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).

func TracerMiddleware

func TracerMiddleware() MiddlewareRegistration

TracerMiddleware wraps handler execution in an OpenTelemetry span.

type OutboxStore

type OutboxStore interface {
	StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}

OutboxStore persists processed messages so they can be forwarded reliably.

type Producer

type Producer interface {
	PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error
}

Producer emits proto-based events onto the configured transport.

type ProtoHandlerOption

type ProtoHandlerOption func(*protoHandlerOptions)

ProtoHandlerOption customises handler registration.

func WithPublishMessageTypes

func WithPublishMessageTypes(msgs ...proto.Message) ProtoHandlerOption

WithPublishMessageTypes registers extra proto schemas emitted by this handler. Use this when the handler may emit multiple message types.

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] struct {
	Name             string
	ConsumeQueue     string
	PublishQueue     string
	Handler          ProtoMessageHandler[T]
	Options          []ProtoHandlerOption
	ValidateOutgoing bool
}

ProtoHandlerRegistration configures a typed protobuf handler that automatically unmarshals incoming payloads and marshals emitted events.

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] struct {
	Payload  T
	Metadata Metadata
}

ProtoMessageContext provides strongly typed access to the incoming message payload

func (ProtoMessageContext[T]) CloneMetadata

func (c ProtoMessageContext[T]) CloneMetadata() Metadata

CloneMetadata returns a copy of the current metadata map so handlers can safely mutate headers for outgoing events without touching the original map.

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] func(ctx context.Context, event ProtoMessageContext[T]) ([]ProtoMessageOutput, error)

ProtoMessageHandler processes a typed protobuf payload and returns the events to emit.

type ProtoMessageOutput

type ProtoMessageOutput struct {
	Message  proto.Message
	Metadata Metadata
}

ProtoMessageOutput describes an event that should be emitted after the handler succeeds.

type ProtoValidator

type ProtoValidator interface {
	Validate(value any) error
}

ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.

type RetryMiddlewareConfig

type RetryMiddlewareConfig struct {
	MaxRetries      int
	InitialInterval time.Duration
	MaxInterval     time.Duration
}

RetryMiddlewareConfig customises the retry middleware behaviour.

type Service

type Service struct {
	Conf *Config

	Logger watermill.LoggerAdapter
	// contains filtered or unexported fields
}

Service wires a Watermill router, publisher, subscriber, and middleware chain.

func NewService

func NewService(conf *Config, log ServiceLogger, ctx context.Context, deps ServiceDependencies) *Service

NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.

func (*Service) PublishProto

func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error

PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.

func (*Service) RegisterMiddleware

func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error

RegisterMiddleware attaches the supplied middleware to the router.

func (*Service) RegisterProtoMessage

func (s *Service) RegisterProtoMessage(msg proto.Message)

RegisterProtoMessage exposes a proto message type for validation without registering a handler.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start runs the underlying Watermill router until the provided context is cancelled.

type ServiceDependencies

type ServiceDependencies struct {
	Outbox                    OutboxStore
	Validator                 ProtoValidator
	Middlewares               []MiddlewareRegistration // Appended after the default middleware chain.
	DisableDefaultMiddlewares bool                     // Skips registering the default middleware chain when true.
}

ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.

type ServiceLogger added in v0.2.0

type ServiceLogger interface {
	With(fields LogFields) ServiceLogger
	Debug(msg string, fields LogFields)
	Info(msg string, fields LogFields)
	Error(msg string, err error, fields LogFields)
	Trace(msg string, fields LogFields)
}

ServiceLogger is the minimal logging contract required by Protoflow services. It maps directly onto Watermill's logging needs so applications can adapt their existing loggers without depending on slog.

func NewEntryServiceLogger added in v0.2.1

func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger

NewEntryServiceLogger wraps an EntryLogger (for example a logrus.Entry) so it can be consumed by Protoflow services without forcing additional logging adapters.

func NewSlogServiceLogger added in v0.2.0

func NewSlogServiceLogger(log *slog.Logger) ServiceLogger

NewSlogServiceLogger wraps a slog.Logger so it satisfies the ServiceLogger interface. This matches the previous behaviour of Protoflow services.

func NewWatermillServiceLogger added in v0.2.0

func NewWatermillServiceLogger(logger watermill.LoggerAdapter) ServiceLogger

NewWatermillServiceLogger wraps an existing Watermill LoggerAdapter so it can be supplied to NewService.

type UnprocessableEventError

type UnprocessableEventError struct {
	// contains filtered or unexported fields
}

UnprocessableEventError wraps payloads that failed validation or unmarshalling.

func (*UnprocessableEventError) Error

func (e *UnprocessableEventError) Error() string

Directories

Path Synopsis
examples
full command
json command
proto command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL