Documentation
¶
Overview ¶
Package messagebus provides a transport-agnostic asynchronous message bus with a middleware stack, routing, and a consumer worker.
Index ¶
- Constants
- func BusMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.Bus
- func BusMustFromResolver(resolver containercontract.Resolver) messagebuscontract.Bus
- func EnsureEnvelope(message any) messagebuscontract.Envelope
- func HandlerLocatorMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.HandlerLocator
- func HandlerLocatorMustFromResolver(resolver containercontract.Resolver) messagebuscontract.HandlerLocator
- func LastStampOfType[T messagebuscontract.Stamp](envelopeInstance messagebuscontract.Envelope) (T, bool)
- func NewEnvelope(message any, stamps ...messagebuscontract.Stamp) messagebuscontract.Envelope
- func NewHandleMessageMiddleware(locator messagebuscontract.HandlerLocator) messagebuscontract.Middleware
- func NewHandleMessageMiddlewareWithOptions(locator messagebuscontract.HandlerLocator, options HandleOptions) messagebuscontract.Middleware
- func NewSendMessageMiddleware(routingByType map[reflect.Type]TransportRouting) messagebuscontract.Middleware
- func NewSendMessageMiddlewareFromRouting(routing *Routing) messagebuscontract.Middleware
- func RedeliveryCount(envelopeInstance messagebuscontract.Envelope) int
- func RegisterHandler[T any](locator *HandlerLocator, ...)
- type BusNameStamp
- type ConsumeCommand
- func (instance *ConsumeCommand) Description() string
- func (instance *ConsumeCommand) Flags() []clicontract.Flag
- func (instance *ConsumeCommand) Name() string
- func (instance *ConsumeCommand) Run(runtimeInstance runtimecontract.Runtime, ...) error
- func (instance *ConsumeCommand) WithShutdownGrace(grace time.Duration) *ConsumeCommand
- type DelayStamp
- type HandleOptions
- type HandledStamp
- type HandlerLocator
- type InMemoryTransport
- func (instance *InMemoryTransport) Ack(runtimeInstance runtimecontract.Runtime, ...) error
- func (instance *InMemoryTransport) Close(runtimeInstance runtimecontract.Runtime) error
- func (instance *InMemoryTransport) Nack(runtimeInstance runtimecontract.Runtime, ...) error
- func (instance *InMemoryTransport) Receive(runtimeInstance runtimecontract.Runtime) (<-chan messagebuscontract.Envelope, error)
- func (instance *InMemoryTransport) Send(runtimeInstance runtimecontract.Runtime, ...) error
- func (instance *InMemoryTransport) WithLogger(logger loggingcontract.Logger) *InMemoryTransport
- type Manager
- type ReceivedStamp
- type RedeliveryStamp
- type RetryPolicy
- type Routing
- type SentStamp
- type TransportRouting
Constants ¶
View Source
const ( ServiceBus = "service.messagebus.bus" ServiceHandlerLocator = "service.messagebus.handler_locator" )
View Source
const ( StampNameBusName = "bus_name" StampNameSent = "sent" StampNameReceived = "received" StampNameHandled = "handled" StampNameRedelivery = "redelivery" StampNameDelay = "delay" )
Variables ¶
This section is empty.
Functions ¶
func BusMustFromContainer ¶
func BusMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.Bus
func BusMustFromResolver ¶
func BusMustFromResolver(resolver containercontract.Resolver) messagebuscontract.Bus
func EnsureEnvelope ¶
func EnsureEnvelope(message any) messagebuscontract.Envelope
func HandlerLocatorMustFromContainer ¶
func HandlerLocatorMustFromContainer(serviceContainer containercontract.Container) messagebuscontract.HandlerLocator
func HandlerLocatorMustFromResolver ¶
func HandlerLocatorMustFromResolver(resolver containercontract.Resolver) messagebuscontract.HandlerLocator
func LastStampOfType ¶
func LastStampOfType[T messagebuscontract.Stamp](envelopeInstance messagebuscontract.Envelope) (T, bool)
func NewEnvelope ¶
func NewEnvelope(message any, stamps ...messagebuscontract.Stamp) messagebuscontract.Envelope
func NewHandleMessageMiddleware ¶
func NewHandleMessageMiddleware(locator messagebuscontract.HandlerLocator) messagebuscontract.Middleware
func NewHandleMessageMiddlewareWithOptions ¶
func NewHandleMessageMiddlewareWithOptions( locator messagebuscontract.HandlerLocator, options HandleOptions, ) messagebuscontract.Middleware
func NewSendMessageMiddleware ¶
func NewSendMessageMiddleware(routingByType map[reflect.Type]TransportRouting) messagebuscontract.Middleware
func NewSendMessageMiddlewareFromRouting ¶
func NewSendMessageMiddlewareFromRouting(routing *Routing) messagebuscontract.Middleware
func RedeliveryCount ¶
func RedeliveryCount(envelopeInstance messagebuscontract.Envelope) int
func RegisterHandler ¶
func RegisterHandler[T any]( locator *HandlerLocator, handle func(runtimeInstance runtimecontract.Runtime, message T) error, )
Types ¶
type BusNameStamp ¶
type BusNameStamp struct {
BusName string
}
func (BusNameStamp) StampName ¶
func (instance BusNameStamp) StampName() string
type ConsumeCommand ¶
type ConsumeCommand struct {
// contains filtered or unexported fields
}
func NewConsumeCommand ¶
func NewConsumeCommand( bus messagebuscontract.Bus, transports map[string]messagebuscontract.Transport, ) *ConsumeCommand
func NewConsumeCommandWithRetry ¶
func NewConsumeCommandWithRetry( bus messagebuscontract.Bus, transports map[string]messagebuscontract.Transport, retryPolicy RetryPolicy, ) *ConsumeCommand
func (*ConsumeCommand) Description ¶
func (instance *ConsumeCommand) Description() string
func (*ConsumeCommand) Flags ¶
func (instance *ConsumeCommand) Flags() []clicontract.Flag
func (*ConsumeCommand) Name ¶
func (instance *ConsumeCommand) Name() string
func (*ConsumeCommand) Run ¶
func (instance *ConsumeCommand) Run( runtimeInstance runtimecontract.Runtime, commandContext *clicontract.CommandContext, ) error
func (*ConsumeCommand) WithShutdownGrace ¶
func (instance *ConsumeCommand) WithShutdownGrace(grace time.Duration) *ConsumeCommand
type DelayStamp ¶
func (DelayStamp) StampName ¶
func (instance DelayStamp) StampName() string
type HandleOptions ¶
type HandleOptions struct {
RequireHandler bool
}
type HandledStamp ¶
type HandledStamp struct {
HandlerName string
}
func (HandledStamp) StampName ¶
func (instance HandledStamp) StampName() string
type HandlerLocator ¶
type HandlerLocator struct {
// contains filtered or unexported fields
}
func NewHandlerLocator ¶
func NewHandlerLocator() *HandlerLocator
func (*HandlerLocator) HandlersFor ¶
func (instance *HandlerLocator) HandlersFor(message any) []messagebuscontract.MessageHandler
func (*HandlerLocator) Register ¶
func (instance *HandlerLocator) Register(messageType reflect.Type, handler messagebuscontract.MessageHandler)
type InMemoryTransport ¶
type InMemoryTransport struct {
// contains filtered or unexported fields
}
func NewInMemoryTransport ¶
func NewInMemoryTransport(bufferSize int) *InMemoryTransport
func (*InMemoryTransport) Ack ¶
func (instance *InMemoryTransport) Ack( runtimeInstance runtimecontract.Runtime, envelopeInstance messagebuscontract.Envelope, ) error
func (*InMemoryTransport) Close ¶
func (instance *InMemoryTransport) Close(runtimeInstance runtimecontract.Runtime) error
func (*InMemoryTransport) Nack ¶
func (instance *InMemoryTransport) Nack( runtimeInstance runtimecontract.Runtime, envelopeInstance messagebuscontract.Envelope, requeue bool, ) error
func (*InMemoryTransport) Receive ¶
func (instance *InMemoryTransport) Receive( runtimeInstance runtimecontract.Runtime, ) (<-chan messagebuscontract.Envelope, error)
func (*InMemoryTransport) Send ¶
func (instance *InMemoryTransport) Send( runtimeInstance runtimecontract.Runtime, envelopeInstance messagebuscontract.Envelope, ) error
func (*InMemoryTransport) WithLogger ¶
func (instance *InMemoryTransport) WithLogger(logger loggingcontract.Logger) *InMemoryTransport
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func NewManager(name string, middlewares ...messagebuscontract.Middleware) *Manager
func (*Manager) Dispatch ¶
func (instance *Manager) Dispatch( runtimeInstance runtimecontract.Runtime, message any, stamps ...messagebuscontract.Stamp, ) (messagebuscontract.Envelope, error)
type ReceivedStamp ¶
type ReceivedStamp struct {
TransportName string
}
func (ReceivedStamp) StampName ¶
func (instance ReceivedStamp) StampName() string
type RedeliveryStamp ¶
type RedeliveryStamp struct {
Count int
}
func (RedeliveryStamp) StampName ¶
func (instance RedeliveryStamp) StampName() string
type RetryPolicy ¶
type Routing ¶
type Routing struct {
// contains filtered or unexported fields
}
func NewRouting ¶
func NewRouting() *Routing
type TransportRouting ¶
type TransportRouting struct {
Name string
Transport messagebuscontract.Transport
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.