messagebus

package
v3.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package messagebus provides a transport-agnostic asynchronous message bus with a middleware stack, routing, and a consumer worker.

Index

Constants

View Source
const (
	ServiceBus            = "service.messagebus.bus"
	ServiceHandlerLocator = "service.messagebus.handler_locator"
)
View Source
const (
	StampNameBusName    = "bus_name"
	StampNameSent       = "sent"
	StampNameReceived   = "received"
	StampNameHandled    = "handled"
	StampNameRedelivery = "redelivery"
	StampNameDelay      = "delay"
)

Variables

This section is empty.

Functions

func BusMustFromContainer

func BusMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.Bus

func BusMustFromResolver

func BusMustFromResolver(resolver containercontract.Resolver) messagebuscontract.Bus

func EnsureEnvelope

func EnsureEnvelope(message any) messagebuscontract.Envelope

func HandlerLocatorMustFromContainer

func HandlerLocatorMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.HandlerLocator

func LastStampOfType

func LastStampOfType[T messagebuscontract.Stamp](envelopeInstance messagebuscontract.Envelope) (T, bool)

func NewEnvelope

func NewEnvelope(message any, stamps ...messagebuscontract.Stamp) messagebuscontract.Envelope

func NewHandleMessageMiddlewareWithOptions

func NewHandleMessageMiddlewareWithOptions(
	locator messagebuscontract.HandlerLocator,
	options HandleOptions,
) messagebuscontract.Middleware

func NewSendMessageMiddleware

func NewSendMessageMiddleware(routingByType map[reflect.Type]TransportRouting) messagebuscontract.Middleware

func NewSendMessageMiddlewareFromRouting

func NewSendMessageMiddlewareFromRouting(routing *Routing) messagebuscontract.Middleware

func RedeliveryCount

func RedeliveryCount(envelopeInstance messagebuscontract.Envelope) int

func RegisterHandler

func RegisterHandler[T any](
	locator *HandlerLocator,
	handle func(runtimeInstance runtimecontract.Runtime, message T) error,
)

Types

type BusNameStamp

type BusNameStamp struct {
	BusName string
}

func (BusNameStamp) StampName

func (instance BusNameStamp) StampName() string

type ConsumeCommand

type ConsumeCommand struct {
	// contains filtered or unexported fields
}

func NewConsumeCommand

func NewConsumeCommand(
	bus messagebuscontract.Bus,
	transports map[string]messagebuscontract.Transport,
) *ConsumeCommand

func NewConsumeCommandWithRetry

func NewConsumeCommandWithRetry(
	bus messagebuscontract.Bus,
	transports map[string]messagebuscontract.Transport,
	retryPolicy RetryPolicy,
) *ConsumeCommand

func (*ConsumeCommand) Description

func (instance *ConsumeCommand) Description() string

func (*ConsumeCommand) Flags

func (instance *ConsumeCommand) Flags() []clicontract.Flag

func (*ConsumeCommand) Name

func (instance *ConsumeCommand) Name() string

func (*ConsumeCommand) Run

func (instance *ConsumeCommand) Run(
	runtimeInstance runtimecontract.Runtime,
	commandContext *clicontract.CommandContext,
) error

func (*ConsumeCommand) WithShutdownGrace

func (instance *ConsumeCommand) WithShutdownGrace(grace time.Duration) *ConsumeCommand

type DelayStamp

type DelayStamp struct {
	Delay time.Duration
}

func (DelayStamp) StampName

func (instance DelayStamp) StampName() string

type HandleOptions

type HandleOptions struct {
	RequireHandler bool
}

type HandledStamp

type HandledStamp struct {
	HandlerName string
}

func (HandledStamp) StampName

func (instance HandledStamp) StampName() string

type HandlerLocator

type HandlerLocator struct {
	// contains filtered or unexported fields
}

func NewHandlerLocator

func NewHandlerLocator() *HandlerLocator

func (*HandlerLocator) HandlersFor

func (instance *HandlerLocator) HandlersFor(message any) []messagebuscontract.MessageHandler

func (*HandlerLocator) Register

func (instance *HandlerLocator) Register(messageType reflect.Type, handler messagebuscontract.MessageHandler)

type InMemoryTransport

type InMemoryTransport struct {
	// contains filtered or unexported fields
}

func NewInMemoryTransport

func NewInMemoryTransport(bufferSize int) *InMemoryTransport

func (*InMemoryTransport) Ack

func (instance *InMemoryTransport) Ack(
	runtimeInstance runtimecontract.Runtime,
	envelopeInstance messagebuscontract.Envelope,
) error

func (*InMemoryTransport) Close

func (instance *InMemoryTransport) Close(runtimeInstance runtimecontract.Runtime) error

func (*InMemoryTransport) Nack

func (instance *InMemoryTransport) Nack(
	runtimeInstance runtimecontract.Runtime,
	envelopeInstance messagebuscontract.Envelope,
	requeue bool,
) error

func (*InMemoryTransport) Receive

func (instance *InMemoryTransport) Receive(
	runtimeInstance runtimecontract.Runtime,
) (<-chan messagebuscontract.Envelope, error)

func (*InMemoryTransport) Send

func (instance *InMemoryTransport) Send(
	runtimeInstance runtimecontract.Runtime,
	envelopeInstance messagebuscontract.Envelope,
) error

func (*InMemoryTransport) WithLogger

func (instance *InMemoryTransport) WithLogger(logger loggingcontract.Logger) *InMemoryTransport

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(name string, middlewares ...messagebuscontract.Middleware) *Manager

func (*Manager) Dispatch

func (instance *Manager) Dispatch(
	runtimeInstance runtimecontract.Runtime,
	message any,
	stamps ...messagebuscontract.Stamp,
) (messagebuscontract.Envelope, error)

type ReceivedStamp

type ReceivedStamp struct {
	TransportName string
}

func (ReceivedStamp) StampName

func (instance ReceivedStamp) StampName() string

type RedeliveryStamp

type RedeliveryStamp struct {
	Count int
}

func (RedeliveryStamp) StampName

func (instance RedeliveryStamp) StampName() string

type RetryPolicy

type RetryPolicy struct {
	MaxRetries          int
	BaseDelay           time.Duration
	FailureTransport    messagebuscontract.Transport
	MaxDelay            time.Duration
	FailureRequeueDelay time.Duration
}

type Routing

type Routing struct {
	// contains filtered or unexported fields
}

func NewRouting

func NewRouting() *Routing

func RouteType

func RouteType[T any](routing *Routing, name string, transport messagebuscontract.Transport) *Routing

type SentStamp

type SentStamp struct {
	TransportName string
}

func (SentStamp) StampName

func (instance SentStamp) StampName() string

type TransportRouting

type TransportRouting struct {
	Name      string
	Transport messagebuscontract.Transport
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL