protoflow

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 30 Imported by: 0

README

Protoflow

Protoflow is a thin productivity layer on top of Watermill that helps you build protobuf or JSON event-driven services that run on Kafka, RabbitMQ, or AWS SNS/SQS. It wires the router, publisher, subscriber, default middleware stack, and typed handler helpers so you can focus on your domain logic instead of plumbing.

Features

  • Strongly typed handler registrations for protobuf (RegisterProtoHandler) and JSON (RegisterJSONHandler) payloads.
  • Built-in router wiring for Kafka, RabbitMQ, and AWS SNS/SQS transports selected via configuration.
  • Default middleware chain that injects correlation IDs, logs payloads, validates protobufs, stores outgoing messages in an outbox, traces with OpenTelemetry, retries with backoff, and routes poison messages.
  • Extension points for plugging in your own ProtoValidator, OutboxStore, and custom middleware registrations.
  • Helper utilities for publishing protobuf events (PublishProto/Service.PublishProto) and cloning metadata safely.

Installation

go get github.com/drblury/protoflow

Go 1.25+ is recommended because the module itself targets Go 1.25.4 in go.mod.

Quick Start

package main

import (
    "context"
    "errors"
    "log/slog"
    "os"

    "github.com/drblury/protoflow"
    orderpb "github.com/your-org/your-protos/gen/go/orders"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    cfg := &protoflow.Config{
        PubSubSystem:       "kafka",
        KafkaBrokers:       []string{"localhost:9092"},
        KafkaConsumerGroup: "orders-service",
        PoisonQueue:        "orders.poison",
    }

    svc := protoflow.NewService(cfg, logger, context.Background(), protoflow.ServiceDependencies{})

    err := protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*orderpb.OrderCreated]{
        Name:               "order-created",
        ConsumeQueue:       "orders.created",
        PublishQueue:       "orders.processed",
        ConsumeMessageType: &orderpb.OrderCreated{},
        ValidateOutgoing:   true,
        Handler: func(ctx context.Context, evt protoflow.ProtoMessageContext[*orderpb.OrderCreated]) ([]protoflow.ProtoMessageOutput, error) {
            // do work with evt.Payload
            metadata := evt.CloneMetadata()
            metadata["event_source"] = "orders-service"
            return []protoflow.ProtoMessageOutput{{
                Message:  &orderpb.OrderProcessed{OrderId: evt.Payload.OrderId},
                Metadata: metadata,
            }}, nil
        },
        Options: []protoflow.ProtoHandlerOption{
            protoflow.WithPublishMessageTypes(&orderpb.OrderProcessed{}),
        },
    })
    if err != nil {
        logger.Error("handler registration failed", "err", err)
        return
    }

    if err := svc.Start(context.Background()); err != nil && !errors.Is(err, context.Canceled) {
        logger.Error("service stopped", "err", err)
    }
}
Registering JSON handlers

If your payloads are JSON instead of protobuf, use RegisterJSONHandler:

err := protoflow.RegisterJSONHandler(svc, protoflow.JSONHandlerRegistration[*IncomingOrder, *OutgoingOrder]{
    Name:               "json-order-handler",
    ConsumeQueue:       "orders.json",
    PublishQueue:       "orders.json.out",
    ConsumeMessageType: &IncomingOrder{},
    Handler: func(ctx context.Context, evt protoflow.JSONMessageContext[*IncomingOrder]) ([]protoflow.JSONMessageOutput[*OutgoingOrder], error) {
        response := &OutgoingOrder{ID: evt.Payload.ID}
        return []protoflow.JSONMessageOutput[*OutgoingOrder]{{
            Message:  response,
            Metadata: evt.CloneMetadata(),
        }}, nil
    },
})
Producing events

Use the helper to publish protobuf messages outside of handlers (for example, from HTTP endpoints):

metadata := protoflow.Metadata{"event_source": "api"}
if err := svc.PublishProto(ctx, "orders.created", &orderpb.OrderCreated{OrderId: "123"}, metadata); err != nil {
    logger.Error("publish failed", "err", err)
}

Configuration reference

Config selects the transport and holds per-transport settings:

cfg := &protoflow.Config{
    PubSubSystem:       "kafka",            // or "rabbitmq" / "aws"
    KafkaBrokers:       []string{"broker"},
    KafkaConsumerGroup: "group",
    RabbitMQURL:        "amqp://guest:guest@localhost",
    AWSRegion:          "eu-west-1",
    AWSAccountID:       "123456789012",
    AWSEndpoint:        "http://localhost:4566", // optional (LocalStack)
    PoisonQueue:        "events.poison",
    RetryMaxRetries:    5,
}

Only the fields required by the selected PubSubSystem are used. The retry-related settings feed into the default retry middleware.

Service dependencies and middleware

ServiceDependencies lets you inject optional collaborators:

  • Validator (ProtoValidator) validates protobuf payloads in the ProtoValidateMiddleware and optionally outgoing events.
  • Outbox (OutboxStore) stores emitted events before they are forwarded.
  • Middlewares ([]MiddlewareRegistration) are appended after the default chain.
  • DisableDefaultMiddlewares skips the built-in middleware stack so you can assemble your own.

The default middleware order is:

  1. Correlation ID
  2. Message logger
  3. Proto validation
  4. Outbox persistence
  5. OpenTelemetry tracer
  6. Retry with exponential backoff
  7. Poison queue forwarding
  8. Panic recoverer

You can register additional middleware by supplying ServiceDependencies.Middlewares or by calling Service.RegisterMiddleware manually.

Local development tips

  • AWS SNS/SQS: set Config.AWSEndpoint to your LocalStack URL to reuse the same code locally.
  • Kafka: ensure KafkaConsumerGroup is unique per service instance.
  • RabbitMQ: the connection is reused for both publisher and subscriber; supply TLS information via amqp.ConnectionConfig if needed.

Testing

The repository comes with unit tests that exercise service wiring, middleware, and handler helpers. Run them with:

go test ./...

Contributing

  1. Fork the repo and create a feature branch.
  2. Run go test ./... before opening a PR.
  3. Keep the README and package docs up to date when you add new features.

License

Protoflow is available under the MIT License.

Documentation

Overview

Package protoflow exposes a small framework on top of Watermill that wires message routers, publishers, subscribers, and middleware for protobuf and JSON driven services. It selects the desired Pub/Sub transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers a middleware chain that adds correlation IDs, logging, protobuf validation, outbox persistence, tracing, retries, and poison queue forwarding.

The Service type hosts the router and offers helpers for registering typed handlers via RegisterProtoHandler or RegisterJSONHandler. Typed registrations automatically marshal/unmarshal payloads, clone metadata safely, and can validate outgoing protobuf events when a ProtoValidator is provided. Service also exposes PublishProto so your HTTP or RPC handlers can emit events without touching the lower-level Watermill APIs.

Advanced users can extend the default middleware chain, plug in an OutboxStore to persist outgoing messages, or provide a ProtoValidator to enforce message contracts. See README.md for a full walkthrough.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateULID

func CreateULID() string

CreateULID returns a time-sortable ULID encoded as a 26-character string.

func Decode

func Decode(r io.Reader, v any) error

func Encode

func Encode(w io.Writer, v any) error

func Marshal

func Marshal(v any) ([]byte, error)

func MarshalIndent

func MarshalIndent(v any, prefix, indent string) ([]byte, error)

func NewMessageFromProto

func NewMessageFromProto(event proto.Message, metadata Metadata) (*message.Message, error)

NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.

func PublishProto

func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata Metadata) error

PublishProto marshals the proto payload and publishes it to the provided topic.

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error

RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error

RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.

func Unmarshal

func Unmarshal(data []byte, v any) error

Types

type Config

type Config struct {
	// PubSubSystem selects the backing message infrastructure. Supported
	// values: "kafka", "rabbitmq", or "aws" (SNS/SQS).
	PubSubSystem string

	// Kafka configuration.
	KafkaBrokers       []string
	KafkaClientID      string
	KafkaConsumerGroup string

	// RabbitMQ configuration.
	RabbitMQURL string

	// PoisonQueue receives messages that cannot be processed even after
	// retries.
	PoisonQueue string

	// AWS (SNS/SQS) configuration.
	AWSRegion          string
	AWSAccountID       string
	AWSAccessKeyID     string
	AWSSecretAccessKey string
	// AWSEndpoint optionally points to a custom endpoint (for example,
	// Localstack in local development).
	AWSEndpoint string

	// RetryMiddleware tuning. Zero values fall back to library defaults.
	RetryMaxRetries      int
	RetryInitialInterval time.Duration
	RetryMaxInterval     time.Duration
}

Config groups the Pub/Sub settings required to initialise the Service. Each transport only uses the keys that are relevant to it.

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] struct {
	Name               string
	ConsumeQueue       string
	PublishQueue       string
	ConsumeMessageType T
	Handler            JSONMessageHandler[T, O]
}

JSONHandlerRegistration wires a typed JSON handler to the router.

type JSONMessageContext

type JSONMessageContext[T any] struct {
	Payload  T
	Metadata Metadata
}

JSONMessageContext exposes the incoming payload and metadata for JSON handlers.

func (JSONMessageContext[T]) CloneMetadata

func (c JSONMessageContext[T]) CloneMetadata() Metadata

CloneMetadata copies the current metadata map so handlers can mutate headers safely.

type JSONMessageHandler

type JSONMessageHandler[T any, O any] func(ctx context.Context, event JSONMessageContext[T]) ([]JSONMessageOutput[O], error)

JSONMessageHandler processes a JSON payload and returns the events to publish.

type JSONMessageOutput

type JSONMessageOutput[T any] struct {
	Message  T
	Metadata Metadata
}

JSONMessageOutput represents an event emitted by a JSON handler.

type Metadata

type Metadata map[string]string

Metadata represents the headers carried alongside an event.

type MiddlewareBuilder

type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)

MiddlewareBuilder constructs a handler middleware using the provided service instance.

type MiddlewareRegistration

type MiddlewareRegistration struct {
	Name       string
	Middleware message.HandlerMiddleware
	Builder    MiddlewareBuilder
}

MiddlewareRegistration captures how a middleware should be registered on a Service router.

func CorrelationIDMiddleware

func CorrelationIDMiddleware() MiddlewareRegistration

CorrelationIDMiddleware ensures each processed message carries a correlation identifier.

func DefaultMiddlewares

func DefaultMiddlewares() []MiddlewareRegistration

DefaultMiddlewares returns the standard middleware chain used by the Service constructor.

func LogMessagesMiddleware

func LogMessagesMiddleware(logger watermill.LoggerAdapter) MiddlewareRegistration

LogMessagesMiddleware logs the full payload and metadata of handled messages.

func OutboxMiddleware

func OutboxMiddleware() MiddlewareRegistration

OutboxMiddleware persists outgoing messages when an OutboxStore is configured.

func PoisonQueueMiddleware

func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration

PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.

func ProtoValidateMiddleware

func ProtoValidateMiddleware() MiddlewareRegistration

ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.

func RecovererMiddleware

func RecovererMiddleware() MiddlewareRegistration

RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.

func RetryMiddleware

func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration

RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).

func TracerMiddleware

func TracerMiddleware() MiddlewareRegistration

TracerMiddleware wraps handler execution in an OpenTelemetry span.

type OutboxStore

type OutboxStore interface {
	StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}

OutboxStore persists processed messages so they can be forwarded reliably.

type Producer

type Producer interface {
	PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error
}

Producer emits proto-based events onto the configured transport.

type ProtoHandlerOption

type ProtoHandlerOption func(*protoHandlerOptions)

ProtoHandlerOption customises handler registration.

func WithPublishMessageTypes

func WithPublishMessageTypes(msgs ...proto.Message) ProtoHandlerOption

WithPublishMessageTypes registers extra proto schemas emitted by this handler. Use this when the handler may emit multiple message types.

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] struct {
	Name               string
	ConsumeQueue       string
	PublishQueue       string
	ConsumeMessageType T
	Handler            ProtoMessageHandler[T]
	PublishMessageType proto.Message
	Options            []ProtoHandlerOption
	ValidateOutgoing   bool
}

ProtoHandlerRegistration configures a typed protobuf handler that automatically unmarshals incoming payloads and marshals emitted events.

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] struct {
	Payload  T
	Metadata Metadata
}

ProtoMessageContext provides strongly typed access to the incoming message payload

func (ProtoMessageContext[T]) CloneMetadata

func (c ProtoMessageContext[T]) CloneMetadata() Metadata

CloneMetadata returns a copy of the current metadata map so handlers can safely mutate headers for outgoing events without touching the original map.

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] func(ctx context.Context, event ProtoMessageContext[T]) ([]ProtoMessageOutput, error)

ProtoMessageHandler processes a typed protobuf payload and returns the events to emit.

type ProtoMessageOutput

type ProtoMessageOutput struct {
	Message  proto.Message
	Metadata Metadata
}

ProtoMessageOutput describes an event that should be emitted after the handler succeeds.

type ProtoValidator

type ProtoValidator interface {
	Validate(proto.Message) error
}

ProtoValidator validates protobuf messages after they are unmarshalled.

type RetryMiddlewareConfig

type RetryMiddlewareConfig struct {
	MaxRetries      int
	InitialInterval time.Duration
	MaxInterval     time.Duration
}

RetryMiddlewareConfig customises the retry middleware behaviour.

type Service

type Service struct {
	Conf *Config

	Logger watermill.LoggerAdapter
	// contains filtered or unexported fields
}

Service wires a Watermill router, publisher, subscriber, and middleware chain.

func NewService

func NewService(conf *Config, log *slog.Logger, ctx context.Context, deps ServiceDependencies) *Service

NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.

func (*Service) PublishProto

func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error

PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.

func (*Service) RegisterMiddleware

func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error

RegisterMiddleware attaches the supplied middleware to the router.

func (*Service) RegisterProtoMessage

func (s *Service) RegisterProtoMessage(msg proto.Message)

RegisterProtoMessage exposes a proto message type for validation without registering a handler.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start runs the underlying Watermill router until the provided context is cancelled.

type ServiceDependencies

type ServiceDependencies struct {
	Outbox                    OutboxStore
	Validator                 ProtoValidator
	Middlewares               []MiddlewareRegistration // Appended after the default middleware chain.
	DisableDefaultMiddlewares bool                     // Skips registering the default middleware chain when true.
}

ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.

type UnprocessableEventError

type UnprocessableEventError struct {
	// contains filtered or unexported fields
}

UnprocessableEventError wraps payloads that failed validation or unmarshalling.

func (*UnprocessableEventError) Error

func (e *UnprocessableEventError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL