protoflow

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 10 Imported by: 0

README

Protoflow

Go Reference Go Report Card CI Coverage Latest Tag License

Stop writing plumbing. Start shipping features.

Protoflow is a productivity layer for Watermill that simplifies event-driven architecture. It manages routers, publishers, subscribers, and middleware so you can focus on your domain logic.

Whether you are using Protobufs or JSON, Protoflow provides a type-safe, production-ready foundation for Kafka, RabbitMQ, AWS SNS/SQS, NATS, and Go Channels.

Feature Highlights

  • Type-Safe Handlers: Generic RegisterProtoHandler and RegisterJSONHandler helpers keep your code clean.
  • Instant Wiring: Switch between Kafka, RabbitMQ, AWS SNS/SQS, NATS, and Go Channels with a single config change.
  • Batteries Included: A robust default middleware stack with correlation IDs, structured logging, validation, outbox pattern, OpenTelemetry tracing, retries, poison queues, and panic recovery.
  • Developer Experience: Built for clarity and ease of use. Extension points exist where you need them, but the defaults just work.
  • Safe Publishing: Emit events from anywhere in your application with confidence using our publishing helpers.

Quick Start

  1. Install: go get github.com/drblury/protoflow (Go 1.25+).
  2. Configure: Set up protoflow.Config.
  3. Launch: Create a Service, register your handlers, and Start.
// 1. Configure your transport (Kafka, RabbitMQ, AWS, NATS, Channel, etc.)
cfg := &protoflow.Config{
    PubSubSystem: "channel", // Use in-memory channel for simple pub/sub
    PoisonQueue:  "orders.poison",
}

// 2. Use your preferred logger (slog, logrus, zap, etc.)
logger := protoflow.NewSlogServiceLogger(slog.Default())
svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})

// 3. Register a strongly-typed handler
must(protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*models.OrderCreated]{
    Name:         "orders-created",
    ConsumeQueue: "orders.created",
    Handler: func(ctx context.Context, evt protoflow.ProtoMessageContext[*models.OrderCreated]) ([]protoflow.ProtoMessageOutput, error) {
        // Your business logic goes here!
        evt.Logger.Info("Order received", protoflow.LogFields{"id": evt.Payload.OrderId})
        return nil, nil
    },
}))

// 4. Start the service
go func() { _ = svc.Start(ctx) }()

Want to emit events? Need metadata handling? Check out the Handlers Guide.

Logging

protoflow.ServiceLogger unifies logging across the router, middleware, and transports. Just wrap your favorite logger and pass it to NewService:

  • protoflow.NewSlogServiceLogger: Adapts log/slog (standard library).
  • protoflow.NewEntryServiceLogger: Adapts any structured logger (logrus, zerolog, etc.) that fits the EntryLoggerAdapter[T] interface.
// Use your existing logger instance
svc := protoflow.NewService(cfg,
    protoflow.NewEntryServiceLogger(myFancyLogger),
    ctx,
    protoflow.ServiceDependencies{},
)

We handle the translation to Watermill's logger interface.

Documentation

Examples

Check out examples/ for runnable code:

  • simple: A basic example of Protoflow.
  • json: Typed JSON handlers with metadata enrichment.
  • proto: Protobuf handlers with generated models.
  • full: A comprehensive example with custom middleware, validators, outbox, and more.

Run them with: go run ./examples/<name>

Development Workflow

We use task to manage development commands.

  • task lint: Keep the code sparkling.
  • task test: Run the gauntlet.

See the Development Guide for the full menu.

Run the full test suite with go test ./... (or task test) before sending changes.

Contribution guidelines

  1. Fork the repo and branch from main.
  2. Run task lint and task test (or the equivalent commands) before opening a PR.
  3. Add or update docs inside docs/ and package comments when you ship new features.
  4. Keep commits focused; attach context in PR descriptions so reviewers understand the transport, middleware, or handler surface you touched.

License

Protoflow is available under the MIT License.

Documentation

Overview

Package protoflow is a small layer on top of Watermill that wires routers, publishers, subscribers, and middleware for protobuf- or JSON-driven services. It reads the target transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers the default middleware chain for correlation IDs, logging, validation, outbox persistence, tracing, retries, and poison queue forwarding.

Service hosts the router and exposes typed helpers: RegisterProtoHandler and RegisterJSONHandler take care of marshaling, metadata cloning, and optional protobuf validation, while Service.PublishProto lets HTTP/RPC handlers emit events without touching low-level Watermill APIs. A minimal setup therefore involves filling Config, creating a Service, registering handlers, and calling Start; see README.md for a copy/paste quick start snippet.

When you need more control, ServiceDependencies exposes well-scoped hooks: bring your own OutboxStore, ProtoValidator, middleware registrations, or even an entire TransportFactory to plug in custom brokers. The README organises these knobs by topic so you can dive into the exact setting you want to adjust without rereading the whole guide.

Index

Constants

This section is empty.

Variables

View Source
var (
	NewService = runtimepkg.NewService

	RegisterMessageHandler  = runtimepkg.RegisterMessageHandler
	WithPublishMessageTypes = handlerpkg.WithPublishMessageTypes

	DefaultMiddlewares      = runtimepkg.DefaultMiddlewares
	CorrelationIDMiddleware = runtimepkg.CorrelationIDMiddleware
	LogMessagesMiddleware   = runtimepkg.LogMessagesMiddleware
	ProtoValidateMiddleware = runtimepkg.ProtoValidateMiddleware
	OutboxMiddleware        = runtimepkg.OutboxMiddleware
	TracerMiddleware        = runtimepkg.TracerMiddleware
	MetricsMiddleware       = runtimepkg.MetricsMiddleware
	RetryMiddleware         = runtimepkg.RetryMiddleware
	PoisonQueueMiddleware   = runtimepkg.PoisonQueueMiddleware
	RecovererMiddleware     = runtimepkg.RecovererMiddleware

	Marshal       = jsoncodec.Marshal
	MarshalIndent = jsoncodec.MarshalIndent
	Unmarshal     = jsoncodec.Unmarshal
	Encode        = jsoncodec.Encode
	Decode        = jsoncodec.Decode

	ErrServiceRequired             = errspkg.ErrServiceRequired
	ErrHandlerRequired             = errspkg.ErrHandlerRequired
	ErrConsumeQueueRequired        = errspkg.ErrConsumeQueueRequired
	ErrHandlerNameRequired         = errspkg.ErrHandlerNameRequired
	ErrConsumeMessageTypeRequired  = errspkg.ErrConsumeMessageTypeRequired
	ErrConsumeMessagePointerNeeded = errspkg.ErrConsumeMessagePointerNeeded
	ErrPublisherRequired           = errspkg.ErrPublisherRequired
	ErrTopicRequired               = errspkg.ErrTopicRequired

	NewSlogServiceLogger = loggingpkg.NewSlogServiceLogger

	NewMetadata = metadatapkg.New

	CreateULID = idspkg.CreateULID
)

Functions

func MustProtoMessage added in v0.2.3

func MustProtoMessage[T proto.Message]() T

func NewProtoMessage added in v0.2.3

func NewProtoMessage[T proto.Message]() (T, error)

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error

Types

type Config

type Config = configpkg.Config

type EntryLogger added in v0.2.1

type EntryLogger = loggingpkg.EntryLogger

type EntryLoggerAdapter added in v0.2.2

type EntryLoggerAdapter[T any] = loggingpkg.EntryLoggerAdapter[T]

type HandlerInfo added in v0.3.3

type HandlerInfo = runtimepkg.HandlerInfo

type HandlerStats added in v0.3.3

type HandlerStats = runtimepkg.HandlerStats

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] = handlerpkg.JSONHandlerRegistration[T, O]

type JSONMessageContext

type JSONMessageContext[T any] = handlerpkg.JSONMessageContext[T]

type JSONMessageHandler

type JSONMessageHandler[T any, O any] = handlerpkg.JSONMessageHandler[T, O]

type JSONMessageOutput

type JSONMessageOutput[T any] = handlerpkg.JSONMessageOutput[T]

type LogFields added in v0.2.0

type LogFields = loggingpkg.LogFields

type MessageHandlerRegistration added in v0.2.0

type MessageHandlerRegistration = runtimepkg.MessageHandlerRegistration

type Metadata

type Metadata = metadatapkg.Metadata

type MiddlewareBuilder

type MiddlewareBuilder = runtimepkg.MiddlewareBuilder

type MiddlewareRegistration

type MiddlewareRegistration = runtimepkg.MiddlewareRegistration

type OutboxStore

type OutboxStore = runtimepkg.OutboxStore

type Producer

type Producer = runtimepkg.Producer

type ProtoHandlerOption

type ProtoHandlerOption = handlerpkg.ProtoHandlerOption

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] = handlerpkg.ProtoHandlerRegistration[T]

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] = handlerpkg.ProtoMessageContext[T]

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] = handlerpkg.ProtoMessageHandler[T]

type ProtoMessageOutput

type ProtoMessageOutput = handlerpkg.ProtoMessageOutput

type ProtoValidator

type ProtoValidator = runtimepkg.ProtoValidator

type RetryMiddlewareConfig

type RetryMiddlewareConfig = runtimepkg.RetryMiddlewareConfig

type Service

type Service = runtimepkg.Service

type ServiceDependencies

type ServiceDependencies = runtimepkg.ServiceDependencies

type ServiceLogger added in v0.2.0

type ServiceLogger = loggingpkg.ServiceLogger

func NewEntryServiceLogger added in v0.2.1

func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger

type Transport added in v0.3.0

type Transport = transportpkg.Transport

type TransportFactory added in v0.3.0

type TransportFactory = transportpkg.Factory

type UnprocessableEventError

type UnprocessableEventError = runtimepkg.UnprocessableEventError

Directories

Path Synopsis
examples
full command
json command
proto command
simple command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL