protoflow

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MIT Imports: 10 Imported by: 0

README

Protoflow

Go Reference Go Report Card CI Coverage Latest Tag License

Protoflow is a thin productivity layer on top of Watermill that wires routers, publishers, subscribers, middleware, and typed handler helpers so you can build protobuf or JSON services without reimplementing the plumbing for Kafka, RabbitMQ, or AWS SNS/SQS. It keeps the domain-facing API in the root module while the heavy lifting lives under internal/runtime/.

Feature highlights

  • Typed handler registrations for protobuf (RegisterProtoHandler) and JSON (RegisterJSONHandler).
  • Built-in router wiring for Kafka, RabbitMQ, and AWS SNS/SQS selected via configuration.
  • Default middleware chain with correlation IDs, structured logging, proto validation, outbox persistence, OpenTelemetry traces, retries, poison queue routing, and panic recovery.
  • Extension points for custom validators, outbox stores, middleware registrations, and transport factories.
  • Publishing helpers so services emit protobuf events safely from any component.

Quick start

  1. Install the module: go get github.com/drblury/protoflow (Go 1.25+).
  2. Pick a transport in protoflow.Config.
  3. Create a Service, register handlers, then call Start.
cfg := &protoflow.Config{
    PubSubSystem:       "kafka",
    KafkaBrokers:       []string{"localhost:9092"},
    KafkaConsumerGroup: "orders-service",
    PoisonQueue:        "orders.poison",
}

logger := protoflow.NewSlogServiceLogger(slog.Default())
svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})

must(protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*models.OrderCreated]{
    Name:         "orders-created",
    ConsumeQueue: "orders.created",
    Handler: func(ctx context.Context, evt protoflow.ProtoMessageContext[*models.OrderCreated]) ([]protoflow.ProtoMessageOutput, error) {
        return nil, nil
    },
}))

go func() { _ = svc.Start(ctx) }()

Set PublishQueue whenever a handler should emit follow-up events. For typed handler patterns, metadata helpers, and publishing utilities see the Handlers guide.

Logging

protoflow.ServiceLogger is the single logging contract the router, middleware, and transports rely on. Wrap whichever logger you already use and supply it to NewService:

  • protoflow.NewSlogServiceLogger adapts a log/slog logger, mapping levels onto Watermill so structured fields stay intact.
  • protoflow.NewEntryServiceLogger takes any entry-style logger that satisfies the generic EntryLoggerAdapter[T] constraint (for example logrus, zerolog’s contextual logger, or your own facade) and turns it into a ServiceLogger without touching Watermill APIs.
type contextualLogger interface {
    WithField(key string, value any) contextualLogger
    WithError(err error) contextualLogger
    Info(args ...any)
    Debug(args ...any)
    Error(args ...any)
    Trace(args ...any)
}

svc := protoflow.NewService(cfg,
    protoflow.NewEntryServiceLogger(contextualLoggerInstance),
    ctx,
    protoflow.ServiceDependencies{},
)

Under the hood Protoflow converts ServiceLogger back into a Watermill LoggerAdapter, so you only have to think about one logging abstraction. More details live in the Handlers guide.

Documentation

Deeper guides live under docs/ so the README stays focused:

  • Handlers — protobuf and JSON handlers, metadata cloning, publishing helpers, logging adapters.
  • Configuration — transport knobs, middleware stack, dependency injection, custom transport factories.
  • Development — Taskfile commands, local broker tips, and testing instructions.

doc.go carries the package-level API docs published on pkg.go.dev.

Examples

examples/ contains runnable scenarios you can launch with go run ./examples/<name>:

  • simple – raw Watermill handler registration.
  • json – typed JSON handlers with metadata enrichment.
  • proto – protobuf handlers backed by generated models.
  • full – hybrid example with custom middleware, validators, an in-memory outbox, and a periodic publisher.

Use them as blueprints and swap the broker configuration for your environment.

Development workflow

taskfile.yml defines repeatable tasks—task lint, task test, and transport-specific helpers—so contributors share the same local workflow. See the development guide for the full command list plus local broker hints.

Run the full test suite with go test ./... (or task test) before sending changes.

Contribution guidelines

  1. Fork the repo and branch from main.
  2. Run task lint and task test (or the equivalent commands) before opening a PR.
  3. Add or update docs inside docs/ and package comments when you ship new features.
  4. Keep commits focused; attach context in PR descriptions so reviewers understand the transport, middleware, or handler surface you touched.

License

Protoflow is available under the MIT License.

Documentation

Overview

Package protoflow is a small layer on top of Watermill that wires routers, publishers, subscribers, and middleware for protobuf- or JSON-driven services. It reads the target transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers the default middleware chain for correlation IDs, logging, validation, outbox persistence, tracing, retries, and poison queue forwarding.

Service hosts the router and exposes typed helpers: RegisterProtoHandler and RegisterJSONHandler take care of marshaling, metadata cloning, and optional protobuf validation, while Service.PublishProto lets HTTP/RPC handlers emit events without touching low-level Watermill APIs. A minimal setup therefore involves filling Config, creating a Service, registering handlers, and calling Start; see README.md for a copy/paste quick start snippet.

When you need more control, ServiceDependencies exposes well-scoped hooks: bring your own OutboxStore, ProtoValidator, middleware registrations, or even an entire TransportFactory to plug in custom brokers. The README organises these knobs by topic so you can dive into the exact setting you want to adjust without rereading the whole guide.

Index

Constants

This section is empty.

Variables

View Source
var (
	NewService = runtimepkg.NewService

	RegisterMessageHandler  = runtimepkg.RegisterMessageHandler
	WithPublishMessageTypes = handlerpkg.WithPublishMessageTypes

	DefaultMiddlewares      = runtimepkg.DefaultMiddlewares
	CorrelationIDMiddleware = runtimepkg.CorrelationIDMiddleware
	LogMessagesMiddleware   = runtimepkg.LogMessagesMiddleware
	ProtoValidateMiddleware = runtimepkg.ProtoValidateMiddleware
	OutboxMiddleware        = runtimepkg.OutboxMiddleware
	TracerMiddleware        = runtimepkg.TracerMiddleware
	RetryMiddleware         = runtimepkg.RetryMiddleware
	PoisonQueueMiddleware   = runtimepkg.PoisonQueueMiddleware
	RecovererMiddleware     = runtimepkg.RecovererMiddleware

	Marshal       = jsoncodec.Marshal
	MarshalIndent = jsoncodec.MarshalIndent
	Unmarshal     = jsoncodec.Unmarshal
	Encode        = jsoncodec.Encode
	Decode        = jsoncodec.Decode

	ErrServiceRequired             = errspkg.ErrServiceRequired
	ErrHandlerRequired             = errspkg.ErrHandlerRequired
	ErrConsumeQueueRequired        = errspkg.ErrConsumeQueueRequired
	ErrHandlerNameRequired         = errspkg.ErrHandlerNameRequired
	ErrConsumeMessageTypeRequired  = errspkg.ErrConsumeMessageTypeRequired
	ErrConsumeMessagePointerNeeded = errspkg.ErrConsumeMessagePointerNeeded
	ErrPublisherRequired           = errspkg.ErrPublisherRequired
	ErrTopicRequired               = errspkg.ErrTopicRequired

	NewSlogServiceLogger = loggingpkg.NewSlogServiceLogger

	NewMetadata = metadatapkg.New

	CreateULID = idspkg.CreateULID
)

Functions

func MustProtoMessage added in v0.2.3

func MustProtoMessage[T proto.Message]() T

func NewProtoMessage added in v0.2.3

func NewProtoMessage[T proto.Message]() (T, error)

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error

Types

type Config

type Config = configpkg.Config

type EntryLogger added in v0.2.1

type EntryLogger = loggingpkg.EntryLogger

type EntryLoggerAdapter added in v0.2.2

type EntryLoggerAdapter[T any] = loggingpkg.EntryLoggerAdapter[T]

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] = handlerpkg.JSONHandlerRegistration[T, O]

type JSONMessageContext

type JSONMessageContext[T any] = handlerpkg.JSONMessageContext[T]

type JSONMessageHandler

type JSONMessageHandler[T any, O any] = handlerpkg.JSONMessageHandler[T, O]

type JSONMessageOutput

type JSONMessageOutput[T any] = handlerpkg.JSONMessageOutput[T]

type LogFields added in v0.2.0

type LogFields = loggingpkg.LogFields

type MessageHandlerRegistration added in v0.2.0

type MessageHandlerRegistration = runtimepkg.MessageHandlerRegistration

type Metadata

type Metadata = metadatapkg.Metadata

type MiddlewareBuilder

type MiddlewareBuilder = runtimepkg.MiddlewareBuilder

type MiddlewareRegistration

type MiddlewareRegistration = runtimepkg.MiddlewareRegistration

type OutboxStore

type OutboxStore = runtimepkg.OutboxStore

type Producer

type Producer = runtimepkg.Producer

type ProtoHandlerOption

type ProtoHandlerOption = handlerpkg.ProtoHandlerOption

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] = handlerpkg.ProtoHandlerRegistration[T]

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] = handlerpkg.ProtoMessageContext[T]

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] = handlerpkg.ProtoMessageHandler[T]

type ProtoMessageOutput

type ProtoMessageOutput = handlerpkg.ProtoMessageOutput

type ProtoValidator

type ProtoValidator = runtimepkg.ProtoValidator

type RetryMiddlewareConfig

type RetryMiddlewareConfig = runtimepkg.RetryMiddlewareConfig

type Service

type Service = runtimepkg.Service

type ServiceDependencies

type ServiceDependencies = runtimepkg.ServiceDependencies

type ServiceLogger added in v0.2.0

type ServiceLogger = loggingpkg.ServiceLogger

func NewEntryServiceLogger added in v0.2.1

func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger

type Transport added in v0.3.0

type Transport = transportpkg.Transport

type TransportFactory added in v0.3.0

type TransportFactory = transportpkg.Factory

type UnprocessableEventError

type UnprocessableEventError = runtimepkg.UnprocessableEventError

Directories

Path Synopsis
examples
full command
json command
proto command
simple command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL