Documentation
¶
Overview ¶
Package protoflow exposes a small framework on top of Watermill that wires message routers, publishers, subscribers, and middleware for protobuf and JSON driven services. It selects the desired Pub/Sub transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers a middleware chain that adds correlation IDs, logging, protobuf validation, outbox persistence, tracing, retries, and poison queue forwarding.
The Service type hosts the router and offers helpers for registering typed handlers via RegisterProtoHandler or RegisterJSONHandler. Typed registrations automatically marshal/unmarshal payloads, clone metadata safely, and can validate outgoing protobuf events when a ProtoValidator is provided. Service also exposes PublishProto so your HTTP or RPC handlers can emit events without touching the lower-level Watermill APIs.
Advanced users can extend the default middleware chain, plug in an OutboxStore to persist outgoing messages, or provide a ProtoValidator to enforce message contracts. See README.md for a full walkthrough.
Index ¶
- Variables
- func CreateULID() string
- func Decode(r io.Reader, v any) error
- func Encode(w io.Writer, v any) error
- func Marshal(v any) ([]byte, error)
- func MarshalIndent(v any, prefix, indent string) ([]byte, error)
- func MustProtoMessage[T proto.Message]() T
- func NewMessageFromProto(event proto.Message, metadata Metadata) (*message.Message, error)
- func NewProtoMessage[T proto.Message]() (T, error)
- func PublishProto(ctx context.Context, publisher message.Publisher, topic string, ...) error
- func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
- func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
- func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error
- func Unmarshal(data []byte, v any) error
- type Config
- type EntryLogger
- type EntryLoggerAdapter
- type JSONHandlerRegistration
- type JSONMessageContext
- type JSONMessageHandler
- type JSONMessageOutput
- type LogFields
- type MessageHandlerRegistration
- type Metadata
- type MiddlewareBuilder
- type MiddlewareRegistration
- func CorrelationIDMiddleware() MiddlewareRegistration
- func DefaultMiddlewares() []MiddlewareRegistration
- func LogMessagesMiddleware(logger watermill.LoggerAdapter) MiddlewareRegistration
- func OutboxMiddleware() MiddlewareRegistration
- func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
- func ProtoValidateMiddleware() MiddlewareRegistration
- func RecovererMiddleware() MiddlewareRegistration
- func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
- func TracerMiddleware() MiddlewareRegistration
- type OutboxStore
- type Producer
- type ProtoHandlerOption
- type ProtoHandlerRegistration
- type ProtoMessageContext
- type ProtoMessageHandler
- type ProtoMessageOutput
- type ProtoValidator
- type RetryMiddlewareConfig
- type Service
- type ServiceDependencies
- type ServiceLogger
- type UnprocessableEventError
Constants ¶
This section is empty.
Variables ¶
var ( ErrServiceRequired = errors.New("protoflow: event service is required") ErrHandlerRequired = errors.New("protoflow: handler function is required") ErrConsumeQueueRequired = errors.New("protoflow: consume queue is required") ErrHandlerNameRequired = errors.New("protoflow: handler name is required") ErrConsumeMessageTypeRequired = errors.New("protoflow: consume message type is required") ErrConsumeMessagePointerNeeded = errors.New("protoflow: consume message type must be a pointer") ErrPublisherRequired = errors.New("protoflow: publisher is required") ErrTopicRequired = errors.New("protoflow: topic is required") )
Functions ¶
func CreateULID ¶
func CreateULID() string
CreateULID returns a time-sortable ULID encoded as a 26-character string.
func MustProtoMessage ¶ added in v0.2.3
MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.
func NewMessageFromProto ¶
NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.
func NewProtoMessage ¶ added in v0.2.3
NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.
func PublishProto ¶
func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata Metadata) error
PublishProto marshals the proto payload and publishes it to the provided topic.
func RegisterJSONHandler ¶
func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.
func RegisterMessageHandler ¶ added in v0.2.0
func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
RegisterMessageHandler attaches the provided handler to the service router.
func RegisterProtoHandler ¶
func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error
RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.
Types ¶
type Config ¶
type Config struct {
// PubSubSystem selects the backing message infrastructure. Supported
// values: "kafka", "rabbitmq", or "aws" (SNS/SQS).
PubSubSystem string
// Kafka configuration.
KafkaBrokers []string
KafkaClientID string
KafkaConsumerGroup string
// RabbitMQ configuration.
RabbitMQURL string
// PoisonQueue receives messages that cannot be processed even after
// retries.
PoisonQueue string
// AWS (SNS/SQS) configuration.
AWSRegion string
AWSAccountID string
AWSAccessKeyID string
AWSSecretAccessKey string
// AWSEndpoint optionally points to a custom endpoint (for example,
// Localstack in local development).
AWSEndpoint string
// RetryMiddleware tuning. Zero values fall back to library defaults.
RetryMaxRetries int
RetryInitialInterval time.Duration
RetryMaxInterval time.Duration
}
Config groups the Pub/Sub settings required to initialise the Service. Each transport only uses the keys that are relevant to it.
type EntryLogger ¶ added in v0.2.1
type EntryLogger interface {
EntryLoggerAdapter[EntryLogger]
}
EntryLogger represents the legacy non-generic entry adapter constraint. It remains exported so existing code that referenced protoflow.EntryLogger keeps compiling, but NewEntryServiceLogger now works with any logger that satisfies EntryLoggerAdapter[T].
type EntryLoggerAdapter ¶ added in v0.2.2
type EntryLoggerAdapter[T any] interface { Error(args ...any) Info(args ...any) Debug(args ...any) Trace(args ...any) WithError(err error) T WithField(key string, value any) T }
EntryLoggerAdapter captures the capabilities required by NewEntryServiceLogger. The constraint is generic so third-party entry-like loggers (for example, loggers whose methods return their own concrete interface type) can be used without additional wrappers.
type JSONHandlerRegistration ¶
type JSONHandlerRegistration[T any, O any] struct { Name string ConsumeQueue string PublishQueue string Handler JSONMessageHandler[T, O] }
JSONHandlerRegistration wires a typed JSON handler to the router.
type JSONMessageContext ¶
JSONMessageContext exposes the incoming payload and metadata for JSON handlers.
func (JSONMessageContext[T]) CloneMetadata ¶
func (c JSONMessageContext[T]) CloneMetadata() Metadata
CloneMetadata copies the current metadata map so handlers can mutate headers safely.
type JSONMessageHandler ¶
type JSONMessageHandler[T any, O any] func(ctx context.Context, event JSONMessageContext[T]) ([]JSONMessageOutput[O], error)
JSONMessageHandler processes a JSON payload and returns the events to publish.
type JSONMessageOutput ¶
JSONMessageOutput represents an event emitted by a JSON handler.
type LogFields ¶ added in v0.2.0
LogFields represents structured logging key/value pairs used by Protoflow.
type MessageHandlerRegistration ¶ added in v0.2.0
type MessageHandlerRegistration struct {
Name string
ConsumeQueue string
PublishQueue string
Handler message.HandlerFunc
Subscriber message.Subscriber
Publisher message.Publisher
}
MessageHandlerRegistration wires a raw Watermill handler without typed helpers.
type Metadata ¶
Metadata represents the headers carried alongside an event.
func NewMetadata ¶ added in v0.2.3
NewMetadata constructs a Metadata map from alternating key/value pairs.
type MiddlewareBuilder ¶
type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)
MiddlewareBuilder constructs a handler middleware using the provided service instance.
type MiddlewareRegistration ¶
type MiddlewareRegistration struct {
Name string
Middleware message.HandlerMiddleware
Builder MiddlewareBuilder
}
MiddlewareRegistration captures how a middleware should be registered on a Service router.
func CorrelationIDMiddleware ¶
func CorrelationIDMiddleware() MiddlewareRegistration
CorrelationIDMiddleware ensures each processed message carries a correlation identifier.
func DefaultMiddlewares ¶
func DefaultMiddlewares() []MiddlewareRegistration
DefaultMiddlewares returns the standard middleware chain used by the Service constructor.
func LogMessagesMiddleware ¶
func LogMessagesMiddleware(logger watermill.LoggerAdapter) MiddlewareRegistration
LogMessagesMiddleware logs the full payload and metadata of handled messages.
func OutboxMiddleware ¶
func OutboxMiddleware() MiddlewareRegistration
OutboxMiddleware persists outgoing messages when an OutboxStore is configured.
func PoisonQueueMiddleware ¶
func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.
func ProtoValidateMiddleware ¶
func ProtoValidateMiddleware() MiddlewareRegistration
ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.
func RecovererMiddleware ¶
func RecovererMiddleware() MiddlewareRegistration
RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.
func RetryMiddleware ¶
func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).
func TracerMiddleware ¶
func TracerMiddleware() MiddlewareRegistration
TracerMiddleware wraps handler execution in an OpenTelemetry span.
type OutboxStore ¶
type OutboxStore interface {
StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}
OutboxStore persists processed messages so they can be forwarded reliably.
type Producer ¶
type Producer interface {
PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error
}
Producer emits proto-based events onto the configured transport.
type ProtoHandlerOption ¶
type ProtoHandlerOption func(*protoHandlerOptions)
ProtoHandlerOption customises handler registration.
func WithPublishMessageTypes ¶
func WithPublishMessageTypes(msgs ...proto.Message) ProtoHandlerOption
WithPublishMessageTypes registers extra proto schemas emitted by this handler. Use this when the handler may emit multiple message types.
type ProtoHandlerRegistration ¶
type ProtoHandlerRegistration[T proto.Message] struct { Name string ConsumeQueue string PublishQueue string Handler ProtoMessageHandler[T] Options []ProtoHandlerOption ValidateOutgoing bool }
ProtoHandlerRegistration configures a typed protobuf handler that automatically unmarshals incoming payloads and marshals emitted events.
type ProtoMessageContext ¶
ProtoMessageContext provides strongly typed access to the incoming message payload
func (ProtoMessageContext[T]) CloneMetadata ¶
func (c ProtoMessageContext[T]) CloneMetadata() Metadata
CloneMetadata returns a copy of the current metadata map so handlers can safely mutate headers for outgoing events without touching the original map.
type ProtoMessageHandler ¶
type ProtoMessageHandler[T proto.Message] func(ctx context.Context, event ProtoMessageContext[T]) ([]ProtoMessageOutput, error)
ProtoMessageHandler processes a typed protobuf payload and returns the events to emit.
type ProtoMessageOutput ¶
ProtoMessageOutput describes an event that should be emitted after the handler succeeds.
type ProtoValidator ¶
ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.
type RetryMiddlewareConfig ¶
type RetryMiddlewareConfig struct {
MaxRetries int
InitialInterval time.Duration
MaxInterval time.Duration
}
RetryMiddlewareConfig customises the retry middleware behaviour.
type Service ¶
type Service struct {
Conf *Config
Logger watermill.LoggerAdapter
// contains filtered or unexported fields
}
Service wires a Watermill router, publisher, subscriber, and middleware chain.
func NewService ¶
func NewService(conf *Config, log ServiceLogger, ctx context.Context, deps ServiceDependencies) *Service
NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.
func (*Service) PublishProto ¶
func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata Metadata) error
PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.
func (*Service) RegisterMiddleware ¶
func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error
RegisterMiddleware attaches the supplied middleware to the router.
func (*Service) RegisterProtoMessage ¶
RegisterProtoMessage exposes a proto message type for validation without registering a handler.
type ServiceDependencies ¶
type ServiceDependencies struct {
Outbox OutboxStore
Validator ProtoValidator
Middlewares []MiddlewareRegistration // Appended after the default middleware chain.
DisableDefaultMiddlewares bool // Skips registering the default middleware chain when true.
}
ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.
type ServiceLogger ¶ added in v0.2.0
type ServiceLogger interface {
With(fields LogFields) ServiceLogger
Debug(msg string, fields LogFields)
Info(msg string, fields LogFields)
Error(msg string, err error, fields LogFields)
Trace(msg string, fields LogFields)
}
ServiceLogger is the minimal logging contract required by Protoflow services. It maps directly onto Watermill's logging needs so applications can adapt their existing loggers without depending on slog.
func NewEntryServiceLogger ¶ added in v0.2.1
func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger
NewEntryServiceLogger wraps an EntryLogger (for example a logrus.Entry) so it can be consumed by Protoflow services without forcing additional logging adapters.
func NewSlogServiceLogger ¶ added in v0.2.0
func NewSlogServiceLogger(log *slog.Logger) ServiceLogger
NewSlogServiceLogger wraps a slog.Logger so it satisfies the ServiceLogger interface. This matches the previous behaviour of Protoflow services.
func NewWatermillServiceLogger ¶ added in v0.2.0
func NewWatermillServiceLogger(logger watermill.LoggerAdapter) ServiceLogger
NewWatermillServiceLogger wraps an existing Watermill LoggerAdapter so it can be supplied to NewService.
type UnprocessableEventError ¶
type UnprocessableEventError struct {
// contains filtered or unexported fields
}
UnprocessableEventError wraps payloads that failed validation or unmarshalling.
func (*UnprocessableEventError) Error ¶
func (e *UnprocessableEventError) Error() string