handlers

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MetadataKeyCorrelationID tracks related messages across services.
	MetadataKeyCorrelationID = "correlation_id"

	// MetadataKeyEventSchema identifies the proto message type.
	MetadataKeyEventSchema = "event_message_schema"

	// MetadataKeyQueueDepth indicates queue depth at time of enqueue.
	MetadataKeyQueueDepth = "protoflow_queue_depth"

	// MetadataKeyEnqueuedAt records when a message was enqueued.
	MetadataKeyEnqueuedAt = "protoflow_enqueued_at"

	// MetadataKeyTraceID stores distributed tracing ID.
	MetadataKeyTraceID = "trace_id"

	// MetadataKeySpanID stores distributed tracing span ID.
	MetadataKeySpanID = "span_id"
)

Metadata key constants used throughout protoflow. These keys are reserved and should not be used for custom metadata.

Variables

This section is empty.

Functions

func BuildJSONHandler

func BuildJSONHandler[T any, O any](handler JSONMessageHandler[T, O], logger loggingpkg.ServiceLogger) (message.HandlerFunc, error)

BuildJSONHandler converts a typed JSON handler into a Watermill handler.

func BuildProtoHandler

func BuildProtoHandler[T proto.Message](prototype T, handler ProtoMessageHandler[T], validate func(proto.Message) error, factory ProtoMessageFactory, logger loggingpkg.ServiceLogger) (message.HandlerFunc, error)

BuildProtoHandler converts the typed handler into a Watermill handler using the provided factory.

func EnsureProtoPrototype

func EnsureProtoPrototype[T proto.Message](candidate T) (T, error)

Types

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      JSONMessageHandler[T, O]
}

JSONHandlerRegistration wires a typed JSON handler to the router.

type JSONMessageContext

type JSONMessageContext[T any] struct {
	MessageContextBase
	Payload T
}

JSONMessageContext exposes the incoming payload and metadata for JSON handlers.

type JSONMessageHandler

type JSONMessageHandler[T any, O any] func(ctx context.Context, event JSONMessageContext[T]) ([]JSONMessageOutput[O], error)

JSONMessageHandler processes a JSON payload and returns the events to publish.

type JSONMessageOutput

type JSONMessageOutput[T any] struct {
	Message  T
	Metadata metadatapkg.Metadata
}

JSONMessageOutput represents an event emitted by a JSON handler.

type MessageContextBase added in v0.4.1

type MessageContextBase struct {
	Metadata metadatapkg.Metadata
	Logger   loggingpkg.ServiceLogger
}

MessageContextBase provides common functionality for all message context types. It holds the metadata and logger shared by JSON and Proto handlers.

func (MessageContextBase) CloneMetadata added in v0.4.1

func (b MessageContextBase) CloneMetadata() metadatapkg.Metadata

CloneMetadata returns a copy of the current metadata map so handlers can safely mutate headers for outgoing events without touching the original map.

func (MessageContextBase) CorrelationID added in v0.4.1

func (b MessageContextBase) CorrelationID() string

CorrelationID returns the correlation ID from metadata, if present.

func (MessageContextBase) Get added in v0.4.1

func (b MessageContextBase) Get(key string) string

Get retrieves a metadata value by key.

type ProtoHandlerOption

type ProtoHandlerOption func(*protoHandlerOptions)

ProtoHandlerOption customises handler registration.

func WithPublishMessageTypes

func WithPublishMessageTypes(msgs ...proto.Message) ProtoHandlerOption

WithPublishMessageTypes registers extra proto schemas emitted by this handler. Use this when the handler may emit multiple message types.

type ProtoHandlerOptions

type ProtoHandlerOptions struct {
	AdditionalPublishTypes []proto.Message
}

ProtoHandlerOptions exposes the resolved handler configuration to callers.

func ApplyProtoHandlerOptions

func ApplyProtoHandlerOptions(opts []ProtoHandlerOption) ProtoHandlerOptions

ApplyProtoHandlerOptions resolves the supplied options into a concrete configuration.

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] struct {
	Name             string
	ConsumeQueue     string
	PublishQueue     string
	Handler          ProtoMessageHandler[T]
	Options          []ProtoHandlerOption
	ValidateOutgoing bool
}

ProtoHandlerRegistration configures a typed protobuf handler that automatically unmarshals incoming payloads and marshals emitted events.

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] struct {
	MessageContextBase
	Payload T
}

ProtoMessageContext provides strongly typed access to the incoming message payload.

type ProtoMessageFactory

type ProtoMessageFactory func(proto.Message, metadatapkg.Metadata) (*message.Message, error)

ProtoMessageFactory converts proto payloads into Watermill messages.

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] func(ctx context.Context, event ProtoMessageContext[T]) ([]ProtoMessageOutput, error)

ProtoMessageHandler processes a typed protobuf payload and returns the events to emit.

type ProtoMessageOutput

type ProtoMessageOutput struct {
	Message  proto.Message
	Metadata metadatapkg.Metadata
}

ProtoMessageOutput describes an event that should be emitted after the handler succeeds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL