Documentation
¶
Overview ¶
Package protoflow is a small layer on top of Watermill that wires routers, publishers, subscribers, and middleware for protobuf- or JSON-driven services. It reads the target transport (Kafka, RabbitMQ, or AWS SNS/SQS) from Config, bootstraps the Watermill router, and registers the default middleware chain for correlation IDs, logging, validation, outbox persistence, tracing, retries, and poison queue forwarding.
Service hosts the router and exposes typed helpers: RegisterProtoHandler and RegisterJSONHandler take care of marshaling, metadata cloning, and optional protobuf validation, while Service.PublishProto lets HTTP/RPC handlers emit events without touching low-level Watermill APIs. A minimal setup therefore involves filling Config, creating a Service, registering handlers, and calling Start; see README.md for a copy/paste quick start snippet.
When you need more control, ServiceDependencies exposes well-scoped hooks: bring your own OutboxStore, ProtoValidator, middleware registrations, or even an entire TransportFactory to plug in custom brokers. The README organises these knobs by topic so you can dive into the exact setting you want to adjust without rereading the whole guide.
Index ¶
- Variables
- func MustProtoMessage[T proto.Message]() T
- func NewProtoMessage[T proto.Message]() (T, error)
- func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
- func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error
- type Config
- type EntryLogger
- type EntryLoggerAdapter
- type JSONHandlerRegistration
- type JSONMessageContext
- type JSONMessageHandler
- type JSONMessageOutput
- type LogFields
- type MessageHandlerRegistration
- type Metadata
- type MiddlewareBuilder
- type MiddlewareRegistration
- type OutboxStore
- type Producer
- type ProtoHandlerOption
- type ProtoHandlerRegistration
- type ProtoMessageContext
- type ProtoMessageHandler
- type ProtoMessageOutput
- type ProtoValidator
- type RetryMiddlewareConfig
- type Service
- type ServiceDependencies
- type ServiceLogger
- type Transport
- type TransportFactory
- type UnprocessableEventError
Constants ¶
This section is empty.
Variables ¶
var ( NewService = runtimepkg.NewService RegisterMessageHandler = runtimepkg.RegisterMessageHandler WithPublishMessageTypes = handlerpkg.WithPublishMessageTypes DefaultMiddlewares = runtimepkg.DefaultMiddlewares CorrelationIDMiddleware = runtimepkg.CorrelationIDMiddleware LogMessagesMiddleware = runtimepkg.LogMessagesMiddleware ProtoValidateMiddleware = runtimepkg.ProtoValidateMiddleware OutboxMiddleware = runtimepkg.OutboxMiddleware TracerMiddleware = runtimepkg.TracerMiddleware RetryMiddleware = runtimepkg.RetryMiddleware PoisonQueueMiddleware = runtimepkg.PoisonQueueMiddleware RecovererMiddleware = runtimepkg.RecovererMiddleware Marshal = jsoncodec.Marshal MarshalIndent = jsoncodec.MarshalIndent Unmarshal = jsoncodec.Unmarshal Encode = jsoncodec.Encode Decode = jsoncodec.Decode ErrServiceRequired = errspkg.ErrServiceRequired ErrHandlerRequired = errspkg.ErrHandlerRequired ErrConsumeQueueRequired = errspkg.ErrConsumeQueueRequired ErrHandlerNameRequired = errspkg.ErrHandlerNameRequired ErrConsumeMessageTypeRequired = errspkg.ErrConsumeMessageTypeRequired ErrConsumeMessagePointerNeeded = errspkg.ErrConsumeMessagePointerNeeded ErrPublisherRequired = errspkg.ErrPublisherRequired ErrTopicRequired = errspkg.ErrTopicRequired NewSlogServiceLogger = loggingpkg.NewSlogServiceLogger NewMetadata = metadatapkg.New CreateULID = idspkg.CreateULID )
Functions ¶
func MustProtoMessage ¶ added in v0.2.3
func NewProtoMessage ¶ added in v0.2.3
func RegisterJSONHandler ¶
func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
Types ¶
type EntryLogger ¶ added in v0.2.1
type EntryLogger = loggingpkg.EntryLogger
type EntryLoggerAdapter ¶ added in v0.2.2
type EntryLoggerAdapter[T any] = loggingpkg.EntryLoggerAdapter[T]
type JSONHandlerRegistration ¶
type JSONHandlerRegistration[T any, O any] = handlerpkg.JSONHandlerRegistration[T, O]
type JSONMessageContext ¶
type JSONMessageContext[T any] = handlerpkg.JSONMessageContext[T]
type JSONMessageHandler ¶
type JSONMessageHandler[T any, O any] = handlerpkg.JSONMessageHandler[T, O]
type JSONMessageOutput ¶
type JSONMessageOutput[T any] = handlerpkg.JSONMessageOutput[T]
type LogFields ¶ added in v0.2.0
type LogFields = loggingpkg.LogFields
type MessageHandlerRegistration ¶ added in v0.2.0
type MessageHandlerRegistration = runtimepkg.MessageHandlerRegistration
type Metadata ¶
type Metadata = metadatapkg.Metadata
type MiddlewareBuilder ¶
type MiddlewareBuilder = runtimepkg.MiddlewareBuilder
type MiddlewareRegistration ¶
type MiddlewareRegistration = runtimepkg.MiddlewareRegistration
type OutboxStore ¶
type OutboxStore = runtimepkg.OutboxStore
type Producer ¶
type Producer = runtimepkg.Producer
type ProtoHandlerOption ¶
type ProtoHandlerOption = handlerpkg.ProtoHandlerOption
type ProtoHandlerRegistration ¶
type ProtoHandlerRegistration[T proto.Message] = handlerpkg.ProtoHandlerRegistration[T]
type ProtoMessageContext ¶
type ProtoMessageContext[T proto.Message] = handlerpkg.ProtoMessageContext[T]
type ProtoMessageHandler ¶
type ProtoMessageHandler[T proto.Message] = handlerpkg.ProtoMessageHandler[T]
type ProtoMessageOutput ¶
type ProtoMessageOutput = handlerpkg.ProtoMessageOutput
type ProtoValidator ¶
type ProtoValidator = runtimepkg.ProtoValidator
type RetryMiddlewareConfig ¶
type RetryMiddlewareConfig = runtimepkg.RetryMiddlewareConfig
type Service ¶
type Service = runtimepkg.Service
type ServiceDependencies ¶
type ServiceDependencies = runtimepkg.ServiceDependencies
type ServiceLogger ¶ added in v0.2.0
type ServiceLogger = loggingpkg.ServiceLogger
func NewEntryServiceLogger ¶ added in v0.2.1
func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger
type Transport ¶ added in v0.3.0
type Transport = transportpkg.Transport
type TransportFactory ¶ added in v0.3.0
type TransportFactory = transportpkg.Factory
type UnprocessableEventError ¶
type UnprocessableEventError = runtimepkg.UnprocessableEventError