Documentation
¶
Overview ¶
Package messaging provides a set of utilities for working with messaging systems. It includes functionality for sending and receiving messages, as well as managing message queues. This package supports various messaging protocols, including AMQP and MQTT. It provides a simple and consistent API for interacting with different messaging systems. The `Sender` type is used for sending messages, while the `Receiver` type is used for receiving messages. Both types provide methods for connecting to a messaging server, sending/receiving messages, and closing the connection.
Note: This package requires a messaging server to be running in order to send/receive messages. Please refer to the documentation of the specific messaging protocol for more information on how to set up a server.
Index ¶
- Constants
- func GetOptValue[T any](key string, opts ...Option) (value T, has bool)
- func ResolveOptValue[T any](key string, optionsResolver *OptionsResolver) (value T, has bool)
- type BaseMessage
- func (bm *BaseMessage) GetBoolHeader(key string) (value bool, exists bool)
- func (bm *BaseMessage) GetFloat64Header(key string) (value float64, exists bool)
- func (bm *BaseMessage) GetFloatHeader(key string) (value float32, exists bool)
- func (bm *BaseMessage) GetHeader(key string) (value []byte, exists bool)
- func (bm *BaseMessage) GetInt16Header(key string) (value int16, exists bool)
- func (bm *BaseMessage) GetInt32Header(key string) (value int32, exists bool)
- func (bm *BaseMessage) GetInt64Header(key string) (value int64, exists bool)
- func (bm *BaseMessage) GetInt8Header(key string) (value int8, exists bool)
- func (bm *BaseMessage) GetIntHeader(key string) (value int, exists bool)
- func (bm *BaseMessage) GetStrHeader(key string) (value string, exists bool)
- func (bm *BaseMessage) Id() string
- func (bm *BaseMessage) ReadAsStr() string
- func (bm *BaseMessage) ReadBody() io.Reader
- func (bm *BaseMessage) ReadBytes() []byte
- func (bm *BaseMessage) ReadContent(out interface{}, contentType string) (err error)
- func (bm *BaseMessage) ReadJSON(out interface{}) (err error)
- func (bm *BaseMessage) ReadXML(out interface{}) (err error)
- func (bm *BaseMessage) SetBodyBytes(input []byte) (n int, err error)
- func (bm *BaseMessage) SetBodyStr(input string) (n int, err error)
- func (bm *BaseMessage) SetBoolHeader(key string, value bool)
- func (bm *BaseMessage) SetFloat64Header(key string, value float64)
- func (bm *BaseMessage) SetFloatHeader(key string, value float32)
- func (bm *BaseMessage) SetFrom(content io.Reader) (n int64, err error)
- func (bm *BaseMessage) SetHeader(key string, value []byte)
- func (bm *BaseMessage) SetInt16Header(key string, value int16)
- func (bm *BaseMessage) SetInt32Header(key string, value int32)
- func (bm *BaseMessage) SetInt64Header(key string, value int64)
- func (bm *BaseMessage) SetInt8Header(key string, value int8)
- func (bm *BaseMessage) SetIntHeader(key string, value int)
- func (bm *BaseMessage) SetStrHeader(key string, value string)
- func (bm *BaseMessage) WriteContent(input interface{}, contentType string) (err error)
- func (bm *BaseMessage) WriteJSON(input interface{}) (err error)
- func (bm *BaseMessage) WriteXML(input interface{}) (err error)
- type Body
- type Header
- type LocalMessage
- type LocalProvider
- func (lp *LocalProvider) AddListener(url *url.URL, listener func(msg Message), options ...Option) (err error)
- func (lp *LocalProvider) Close() (err error)
- func (lp *LocalProvider) Id() string
- func (lp *LocalProvider) NewMessage(scheme string, options ...Option) (msg Message, err error)
- func (lp *LocalProvider) Receive(url *url.URL, options ...Option) (msg Message, err error)
- func (lp *LocalProvider) ReceiveBatch(url *url.URL, options ...Option) (msgs []Message, err error)
- func (lp *LocalProvider) Schemes() (schemes []string)
- func (lp *LocalProvider) Send(url *url.URL, msg Message, options ...Option) (err error)
- func (lp *LocalProvider) SendBatch(url *url.URL, msgs []Message, options ...Option) (err error)
- func (lp *LocalProvider) Setup() (err error)
- type Manager
- type Message
- type Option
- type OptionsBuilder
- func (ob *OptionsBuilder) Add(key string, value interface{}) *OptionsBuilder
- func (ob *OptionsBuilder) AddCircuitBreaker(failureThreshold, successThreshold uint64, maxHalfOpen, timeout uint32) *OptionsBuilder
- func (ob *OptionsBuilder) AddNamedListener(name string) *OptionsBuilder
- func (ob *OptionsBuilder) AddRetryHandler(maxRetries, wait int) *OptionsBuilder
- func (ob *OptionsBuilder) Build() []Option
- type OptionsResolver
- type Producer
- type Provider
- type Receiver
Constants ¶
const ( CircuitBreakerOpts = "CircuitBreakerOption" RetryOpts = "CircuitBreakerOption" NamedListener = "NamedListener" )
const (
LocalMsgScheme = "chan"
)
Variables ¶
This section is empty.
Functions ¶
func GetOptValue ¶ added in v1.1.3
func ResolveOptValue ¶ added in v1.1.3
func ResolveOptValue[T any](key string, optionsResolver *OptionsResolver) (value T, has bool)
Types ¶
type BaseMessage ¶
type BaseMessage struct {
// contains filtered or unexported fields
}
func NewBaseMessage ¶ added in v1.2.2
func NewBaseMessage() (baseMsg *BaseMessage, err error)
func (*BaseMessage) GetBoolHeader ¶
func (bm *BaseMessage) GetBoolHeader(key string) (value bool, exists bool)
func (*BaseMessage) GetFloat64Header ¶
func (bm *BaseMessage) GetFloat64Header(key string) (value float64, exists bool)
func (*BaseMessage) GetFloatHeader ¶
func (bm *BaseMessage) GetFloatHeader(key string) (value float32, exists bool)
func (*BaseMessage) GetHeader ¶
func (bm *BaseMessage) GetHeader(key string) (value []byte, exists bool)
func (*BaseMessage) GetInt16Header ¶
func (bm *BaseMessage) GetInt16Header(key string) (value int16, exists bool)
func (*BaseMessage) GetInt32Header ¶
func (bm *BaseMessage) GetInt32Header(key string) (value int32, exists bool)
func (*BaseMessage) GetInt64Header ¶
func (bm *BaseMessage) GetInt64Header(key string) (value int64, exists bool)
func (*BaseMessage) GetInt8Header ¶
func (bm *BaseMessage) GetInt8Header(key string) (value int8, exists bool)
func (*BaseMessage) GetIntHeader ¶
func (bm *BaseMessage) GetIntHeader(key string) (value int, exists bool)
func (*BaseMessage) GetStrHeader ¶
func (bm *BaseMessage) GetStrHeader(key string) (value string, exists bool)
func (*BaseMessage) Id ¶ added in v1.1.1
func (bm *BaseMessage) Id() string
func (*BaseMessage) ReadAsStr ¶
func (bm *BaseMessage) ReadAsStr() string
func (*BaseMessage) ReadBody ¶
func (bm *BaseMessage) ReadBody() io.Reader
func (*BaseMessage) ReadBytes ¶
func (bm *BaseMessage) ReadBytes() []byte
func (*BaseMessage) ReadContent ¶
func (bm *BaseMessage) ReadContent(out interface{}, contentType string) (err error)
func (*BaseMessage) ReadJSON ¶
func (bm *BaseMessage) ReadJSON(out interface{}) (err error)
func (*BaseMessage) ReadXML ¶
func (bm *BaseMessage) ReadXML(out interface{}) (err error)
func (*BaseMessage) SetBodyBytes ¶
func (bm *BaseMessage) SetBodyBytes(input []byte) (n int, err error)
func (*BaseMessage) SetBodyStr ¶
func (bm *BaseMessage) SetBodyStr(input string) (n int, err error)
func (*BaseMessage) SetBoolHeader ¶
func (bm *BaseMessage) SetBoolHeader(key string, value bool)
func (*BaseMessage) SetFloat64Header ¶
func (bm *BaseMessage) SetFloat64Header(key string, value float64)
func (*BaseMessage) SetFloatHeader ¶
func (bm *BaseMessage) SetFloatHeader(key string, value float32)
func (*BaseMessage) SetFrom ¶
func (bm *BaseMessage) SetFrom(content io.Reader) (n int64, err error)
func (*BaseMessage) SetHeader ¶
func (bm *BaseMessage) SetHeader(key string, value []byte)
func (*BaseMessage) SetInt16Header ¶
func (bm *BaseMessage) SetInt16Header(key string, value int16)
func (*BaseMessage) SetInt32Header ¶
func (bm *BaseMessage) SetInt32Header(key string, value int32)
func (*BaseMessage) SetInt64Header ¶
func (bm *BaseMessage) SetInt64Header(key string, value int64)
func (*BaseMessage) SetInt8Header ¶
func (bm *BaseMessage) SetInt8Header(key string, value int8)
func (*BaseMessage) SetIntHeader ¶
func (bm *BaseMessage) SetIntHeader(key string, value int)
func (*BaseMessage) SetStrHeader ¶
func (bm *BaseMessage) SetStrHeader(key string, value string)
func (*BaseMessage) WriteContent ¶
func (bm *BaseMessage) WriteContent(input interface{}, contentType string) (err error)
func (*BaseMessage) WriteJSON ¶
func (bm *BaseMessage) WriteJSON(input interface{}) (err error)
func (*BaseMessage) WriteXML ¶
func (bm *BaseMessage) WriteXML(input interface{}) (err error)
type Body ¶
type Body interface {
// SetBodyStr sets the string body to the Message structure
SetBodyStr(in string) (int, error)
// SetBodyBytes sets the byte[] body to the Message structure
SetBodyBytes(int []byte) (int, error)
// SetFrom sets the Reader body to the Message structure
SetFrom(content io.Reader) (int64, error)
// WriteJSON sets the JSON body to the Message structure
WriteJSON(int interface{}) error
// WriteXML sets the XML body to the Message structure
WriteXML(in interface{}) error
// WriteContent sets the custom body type based on the contentType to the Message structure
WriteContent(in interface{}, contentType string) error
// ReadBody reads the Reader body from the Message structure
ReadBody() io.Reader
// ReadBytes reads the []byte body from the Message structure
ReadBytes() []byte
// ReadAsStr reads the string body from the Message structure
ReadAsStr() string
// ReadJSON reads the JSON body from the Message structure
ReadJSON(out interface{}) error
// ReadXML reads the XML body from the Message structure
ReadXML(out interface{}) error
// ReadContent reads the content body based on the contentType from the Message structure
ReadContent(out interface{}, contentType string) error
}
Body defines all the body interfaces required by the body of the messaging client
type Header ¶
type Header interface {
// Id returns the message id of the message
Id() string
// SetHeader sets the byte header value for the Message header
SetHeader(key string, value []byte)
// SetStrHeader sets the string header value for the Message header
SetStrHeader(key string, value string)
// SetBoolHeader sets the boolean header value for the Message header
SetBoolHeader(key string, value bool)
// SetIntHeader sets the int header value for the Message header
SetIntHeader(key string, value int)
// SetInt8Header sets the int8 header value for the Message header
SetInt8Header(key string, value int8)
// SetInt16Header sets the int16 header value for the Message header
SetInt16Header(key string, value int16)
// SetInt32Header sets the int32 header value for the Message header
SetInt32Header(key string, value int32)
// SetInt64Header sets the int64 header value for the Message header
SetInt64Header(key string, value int64)
// SetFloatHeader sets the float32 header value for the Message header
SetFloatHeader(key string, value float32)
// SetFloat64Header sets the float64 header value for the Message header
SetFloat64Header(key string, value float64)
// GetHeader returns the value of the key set in the headers if exists in the byte[] value
GetHeader(key string) (value []byte, exists bool)
// GetStrHeader returns the value of the key set in the headers if exists in the string value
GetStrHeader(key string) (value string, exists bool)
// GetBoolHeader returns the value of the key set in the headers if exists in the bool value
GetBoolHeader(key string) (value bool, exists bool)
// GetIntHeader returns the value of the key set in the headers if exists in the int value
GetIntHeader(key string) (value int, exists bool)
// GetInt8Header returns the value of the key set in the headers if exists in the int8 value
GetInt8Header(key string) (value int8, exists bool)
// GetInt16Header returns the value of the key set in the headers if exists in the int16 value
GetInt16Header(key string) (value int16, exists bool)
// GetInt32Header returns the value of the key set in the headers if exists in the int32 value
GetInt32Header(key string) (value int32, exists bool)
// GetInt64Header returns the value of the key set in the headers if exists in the int64 value
GetInt64Header(key string) (value int64, exists bool)
// GetFloatHeader returns the value of the key set in the headers if exists in the float32 value
GetFloatHeader(key string) (value float32, exists bool)
// GetFloat64Header returns the value of the key set in the headers if exists in the float64 value
GetFloat64Header(key string) (value float64, exists bool)
}
Header defines all the header interfaces required by the messaging clients
type LocalMessage ¶
type LocalMessage struct {
*BaseMessage
}
type LocalProvider ¶
type LocalProvider struct {
// contains filtered or unexported fields
}
LocalProvider is an implementation of the Provider interface
func (*LocalProvider) AddListener ¶
func (*LocalProvider) Close ¶ added in v1.1.1
func (lp *LocalProvider) Close() (err error)
func (*LocalProvider) Id ¶ added in v1.1.1
func (lp *LocalProvider) Id() string
func (*LocalProvider) NewMessage ¶
func (lp *LocalProvider) NewMessage(scheme string, options ...Option) (msg Message, err error)
func (*LocalProvider) ReceiveBatch ¶
func (*LocalProvider) Schemes ¶
func (lp *LocalProvider) Schemes() (schemes []string)
func (*LocalProvider) Setup ¶
func (lp *LocalProvider) Setup() (err error)
type Manager ¶
Manager interface defines an abstraction for messaging providers that can be registered
func GetManager ¶ added in v1.1.1
func GetManager() Manager
GetManager returns the facade messaging instance
type Message ¶
type Message interface {
Header
Body
// Rsvp function provides a facade to acknowledge the message to the provider indicating the acceptance or rejection
//as mentioned by the first bool parameter.
//Additional options can be set for indicating further actions.
//This functionality is purely dependent on the capability of the provider to accept an acknowledgement.
Rsvp(bool, ...Option) error
}
Message interface wil be implemented by all third party implementation such as aws - sns, sqs, gcp -> pub/sub, gcm, messaging -> amqp, kafka
func NewLocalMessage ¶
type OptionsBuilder ¶
type OptionsBuilder struct {
// contains filtered or unexported fields
}
func NewOptionsBuilder ¶
func NewOptionsBuilder() *OptionsBuilder
TODO check if we can pool this for performance
func (*OptionsBuilder) Add ¶
func (ob *OptionsBuilder) Add(key string, value interface{}) *OptionsBuilder
TODO check if you need to pool this for performance
func (*OptionsBuilder) AddCircuitBreaker ¶
func (ob *OptionsBuilder) AddCircuitBreaker(failureThreshold, successThreshold uint64, maxHalfOpen, timeout uint32) *OptionsBuilder
func (*OptionsBuilder) AddNamedListener ¶ added in v1.1.3
func (ob *OptionsBuilder) AddNamedListener(name string) *OptionsBuilder
func (*OptionsBuilder) AddRetryHandler ¶
func (ob *OptionsBuilder) AddRetryHandler(maxRetries, wait int) *OptionsBuilder
func (*OptionsBuilder) Build ¶
func (ob *OptionsBuilder) Build() []Option
type OptionsResolver ¶
type OptionsResolver struct {
// contains filtered or unexported fields
}
func NewOptionsResolver ¶
func NewOptionsResolver(options ...Option) (optsResolver *OptionsResolver)
func (*OptionsResolver) Get ¶
func (or *OptionsResolver) Get(key string) (value interface{}, has bool)
type Producer ¶
type Producer interface {
// Send function sends an individual message to the url
Send(*url.URL, Message, ...Option) error
// SendBatch sends a batch of messages to the url
SendBatch(*url.URL, []Message, ...Option) error
}
Producer interface is used to send message(s) to a specific provider
type Provider ¶
type Provider interface {
//Extends io.Closer
io.Closer
// Producer Interface included
Producer
// Receiver interface included
Receiver
// Id returns the id of the provider
Id() string
// Schemes is array of URL schemes supported by this provider
Schemes() []string
// Setup method called
Setup() error
// NewMessage function creates a new message that can be used by the clients. It expects the scheme to be provided
NewMessage(string, ...Option) (Message, error)
}
Provider interface exposes methods for a messaging provider It includes Producer and Receiver interfaces It also includes Schemes method to get the supported schemes, Setup method to perform initial setup and NewMessage method to create a new message
type Receiver ¶
type Receiver interface {
// Receive function performs on-demand receive of a single message.
// This function may or may not wait for the messages to arrive. This is purely dependent on the implementation.
Receive(*url.URL, ...Option) (Message, error)
// ReceiveBatch function performs on-demand receive of a batch of messages.
// This function may or may not wait for the messages to arrive. This is purely dependent on the implementation.
ReceiveBatch(*url.URL, ...Option) ([]Message, error)
// AddListener registers a listener for the message
AddListener(*url.URL, func(msg Message), ...Option) error
}
Receiver interface provides the functions for receiving a message(s)