runtime

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustProtoMessage

func MustProtoMessage[T proto.Message]() T

MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.

func NewMessageFromProto

func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)

NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.

func NewProtoMessage

func NewProtoMessage[T proto.Message]() (T, error)

NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.

func PublishProto

func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto marshals the proto payload and publishes it to the provided topic.

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error

RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.

func RegisterMessageHandler

func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error

RegisterMessageHandler attaches the provided handler to the service router.

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error

RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.

Types

type MessageHandlerRegistration

type MessageHandlerRegistration struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      message.HandlerFunc
	Subscriber   message.Subscriber
	Publisher    message.Publisher
}

MessageHandlerRegistration wires a raw Watermill handler without typed helpers.

type MiddlewareBuilder

type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)

MiddlewareBuilder constructs a handler middleware using the provided service instance.

type MiddlewareRegistration

type MiddlewareRegistration struct {
	Name       string
	Middleware message.HandlerMiddleware
	Builder    MiddlewareBuilder
}

MiddlewareRegistration captures how a middleware should be registered on a Service router.

func CorrelationIDMiddleware

func CorrelationIDMiddleware() MiddlewareRegistration

CorrelationIDMiddleware ensures each processed message carries a correlation identifier.

func DefaultMiddlewares

func DefaultMiddlewares() []MiddlewareRegistration

DefaultMiddlewares returns the standard middleware chain used by the Service constructor.

func LogMessagesMiddleware

func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration

LogMessagesMiddleware logs the full payload and metadata of handled messages.

func MetricsMiddleware added in v0.3.2

func MetricsMiddleware() MiddlewareRegistration

MetricsMiddleware adds Prometheus metrics to the handler.

func OutboxMiddleware

func OutboxMiddleware() MiddlewareRegistration

OutboxMiddleware persists outgoing messages when an OutboxStore is configured.

func PoisonQueueMiddleware

func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration

PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.

func ProtoValidateMiddleware

func ProtoValidateMiddleware() MiddlewareRegistration

ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.

func RecovererMiddleware

func RecovererMiddleware() MiddlewareRegistration

RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.

func RetryMiddleware

func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration

RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).

func TracerMiddleware

func TracerMiddleware() MiddlewareRegistration

TracerMiddleware wraps handler execution in an OpenTelemetry span.

type OutboxStore

type OutboxStore interface {
	StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}

OutboxStore persists processed messages so they can be forwarded reliably.

type Producer

type Producer interface {
	PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
}

Producer emits proto-based events onto the configured transport.

type ProtoValidator

type ProtoValidator interface {
	Validate(value any) error
}

ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.

type RetryMiddlewareConfig

type RetryMiddlewareConfig struct {
	MaxRetries      int
	InitialInterval time.Duration
	MaxInterval     time.Duration
}

RetryMiddlewareConfig customises the retry middleware behaviour.

type Service

type Service struct {
	Conf   *configpkg.Config
	Logger loggingpkg.ServiceLogger
	// contains filtered or unexported fields
}

Service wires a Watermill router, publisher, subscriber, and middleware chain.

func NewService

NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.

func (*Service) PublishProto

func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.

func (*Service) RegisterMiddleware

func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error

RegisterMiddleware attaches the supplied middleware to the router.

func (*Service) RegisterProtoMessage

func (s *Service) RegisterProtoMessage(msg proto.Message)

RegisterProtoMessage exposes a proto message type for validation without registering a handler.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start runs the underlying Watermill router until the provided context is cancelled.

type ServiceDependencies

type ServiceDependencies struct {
	Outbox                    OutboxStore
	Validator                 ProtoValidator
	Middlewares               []MiddlewareRegistration // Appended after the default middleware chain.
	DisableDefaultMiddlewares bool                     // Skips registering the default middleware chain when true.
	TransportFactory          transportpkg.Factory
}

ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.

type UnprocessableEventError

type UnprocessableEventError struct {
	// contains filtered or unexported fields
}

UnprocessableEventError wraps payloads that failed validation or unmarshalling.

func (*UnprocessableEventError) Error

func (e *UnprocessableEventError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL