Documentation
¶
Index ¶
- Constants
- type BaseMessage
- type EntityMessage
- type IMessage
- type IMessageBus
- type IMessageConsumer
- type IMessageProducer
- type InMemoryMessageBus
- func (m *InMemoryMessageBus) CloneMessageBus() (IMessageBus, error)
- func (m *InMemoryMessageBus) Close() error
- func (m *InMemoryMessageBus) CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)
- func (m *InMemoryMessageBus) CreateProducer(topic string) (IMessageProducer, error)
- func (m *InMemoryMessageBus) Ping(retries uint, intervalInSeconds uint) error
- func (m *InMemoryMessageBus) Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)
- func (m *InMemoryMessageBus) Publish(messages ...IMessage) error
- func (m *InMemoryMessageBus) Push(messages ...IMessage) error
- func (m *InMemoryMessageBus) Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, ...) (subscriptionId string, error error)
- func (m *InMemoryMessageBus) Unsubscribe(subscriptionId string) (success bool)
- type InMemoryMessageConsumer
- type Message
- type MessageFactory
- type SubscriptionCallback
Constants ¶
const EntityMessageTopic = "ENTITY"
EntityMessageTopic is a predefined topic name for messages that carry entities.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseMessage ¶
type BaseMessage struct {
MsgTopic string `json:"topic"` // The topic (channel or queue) of the message.
MsgOpCode int `json:"opCode"` // An operational code for the message.
MsgVersion string `json:"version"` // The version of the message.
MsgAddressee string `json:"addressee"` // The final recipient of the message.
MsgSessionId string `json:"sessionId"` // A session ID for tracking related messages.
}
BaseMessage provides a basic implementation of the IMessage interface. It can be embedded in other message structs to provide default behavior for the common message attributes. @Data
func (*BaseMessage) Addressee ¶
func (m *BaseMessage) Addressee() string
Addressee returns the message's final addressee.
func (*BaseMessage) OpCode ¶
func (m *BaseMessage) OpCode() int
OpCode returns the message's operational code.
func (*BaseMessage) Payload ¶
func (m *BaseMessage) Payload() any
Payload returns nil, as the BaseMessage does not carry a payload itself. This method is intended to be overridden by embedding structs.
func (*BaseMessage) SessionId ¶
func (m *BaseMessage) SessionId() string
SessionId returns the message's session ID.
func (*BaseMessage) Version ¶ added in v1.2.52
func (m *BaseMessage) Version() string
Version returns the message's version.
type EntityMessage ¶ added in v1.2.43
type EntityMessage struct {
BaseMessage
MsgPayload any `json:"payload"` // The payload of the message, typically an entity.
}
EntityMessage is a message structure for carrying generic entity data. It embeds BaseMessage and uses an `any` type for the payload. @Data
func (*EntityMessage) Payload ¶ added in v1.2.43
func (m *EntityMessage) Payload() any
Payload returns the message's payload.
type IMessage ¶
type IMessage interface {
// Topic returns the name of the topic, channel, or queue the message is associated with.
Topic() string
// OpCode returns the message's operational code, which can be used to indicate the type or purpose of the message.
OpCode() int
// Addressee returns the final recipient of the message. This is an optional field.
Addressee() string
// SessionId returns an identifier for a message exchange session. This ID is shared across all messages
// belonging to the same session, allowing for conversational message patterns.
SessionId() string
// Version returns the version of the message, which can be used for compatibility and evolution of message formats.
Version() string
// Payload returns the body of the message, which can be of any type.
Payload() any
}
IMessage defines the interface for a standard message used within the messaging system. It provides a structured way to define messages with common attributes like topic, operation code, and payload.
func GetMessage ¶ added in v1.2.57
GetMessage creates and initializes a new generic Message with a given topic and payload. It automatically assigns a new session ID.
Type Parameters:
T: The type of the payload.
Parameters:
topic: The topic for the message. payload: The payload for the message.
Returns:
A new, initialized IMessage.
func NewEntityMessage ¶ added in v1.2.44
func NewEntityMessage() IMessage
NewEntityMessage is a factory function that creates a new instance of EntityMessage.
Returns:
A new IMessage of type EntityMessage.
func NewMessage ¶ added in v1.2.57
NewMessage creates a new instance of a generic Message.
Type Parameters:
T: The type of the payload.
Returns:
A new IMessage with a typed payload.
type IMessageBus ¶
type IMessageBus interface {
// Closer includes the Close() method for releasing resources.
io.Closer
// Ping tests the connectivity to the message bus.
// It attempts to connect a specified number of times with a given interval.
//
// Parameters:
// retries: The number of times to retry connecting.
// intervalInSeconds: The time in seconds to wait between retries.
//
// Returns:
// An error if the connection cannot be established, otherwise nil.
Ping(retries uint, intervalInSeconds uint) error
// CloneMessageBus creates and returns a new instance of the message bus,
// effectively cloning the current configuration and state.
//
// Returns:
// A new IMessageBus instance.
// An error if the cloning process fails.
CloneMessageBus() (IMessageBus, error)
// Publish sends one or more messages to a topic. This is part of the publish-subscribe pattern,
// where messages are broadcast to all subscribers of a topic.
//
// Parameters:
// messages: A variadic slice of IMessage to be published.
//
// Returns:
// An error if publishing fails.
Publish(messages ...IMessage) error
// Subscribe creates a subscription to one or more topics, allowing a consumer to receive messages.
//
// Parameters:
// subscription: A name for the subscription, which can be used for durable subscriptions.
// mf: A MessageFactory function to create new message instances for unmarshalling.
// callback: The function to be called when a message is received.
// topics: A variadic slice of topic names to subscribe to.
//
// Returns:
// A unique string identifying the subscription.
// An error if the subscription fails.
Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (string, error)
// Unsubscribe removes a subscription, stopping the flow of messages to it.
//
// Parameters:
// subscriptionId: The ID of the subscription to be removed.
//
// Returns:
// A boolean indicating whether the unsubscription was successful.
Unsubscribe(subscriptionId string) bool
// Push adds one or more messages to a queue. This is part of the queueing pattern,
// where messages are processed by one of the competing consumers.
//
// Parameters:
// messages: A variadic slice of IMessage to be added to the queue.
//
// Returns:
// An error if the push operation fails.
Push(messages ...IMessage) error
// Pop removes and returns a message from a queue. It can block until a message is available
// or a timeout occurs.
//
// Parameters:
// mf: A MessageFactory function to create a new message instance for unmarshalling.
// timeout: The maximum time to wait for a message.
// queue: A variadic slice of queue names to pop from.
//
// Returns:
// The IMessage that was popped from the queue.
// An error if the operation times out or fails.
Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)
// CreateProducer creates a message producer for a specific topic. A producer is responsible for
// sending messages.
//
// Parameters:
// topic: The name of the topic for which to create the producer.
//
// Returns:
// An IMessageProducer instance.
// An error if the creation fails.
CreateProducer(topic string) (IMessageProducer, error)
// CreateConsumer creates a message consumer for one or more topics. A consumer is responsible for
// receiving messages.
//
// Parameters:
// subscription: A name for the subscription, useful for durable consumers.
// mf: A MessageFactory function to create new message instances.
// topics: The topics to consume messages from.
//
// Returns:
// An IMessageConsumer instance.
// An error if the creation fails.
CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)
}
IMessageBus defines the interface for a message bus, which provides a unified way to handle messaging through various implementations (e.g., in-memory, RabbitMQ, NATS). It supports both publish-subscribe and queueing patterns.
func NewInMemoryMessageBus ¶
func NewInMemoryMessageBus() (mq IMessageBus, err error)
NewInMemoryMessageBus is a factory method that creates and returns a new instance of InMemoryMessageBus. It initializes the topics and queues maps.
type IMessageConsumer ¶ added in v1.2.47
type IMessageConsumer interface {
// Closer includes the Close() method for releasing resources.
io.Closer
// Read retrieves a message from the topic, blocking until a new message arrives or a timeout is reached.
// A timeout of 0 can be used to block indefinitely.
//
// A typical usage pattern is an infinite loop:
//
// for {
// if msg, err := consumer.Read(time.Second * 5); err != nil {
// // Handle error, e.g., log it or break the loop
// } else {
// // Process the message, often in a separate goroutine
// go processThisMessage(msg)
// }
// }
//
// Parameters:
// timeout: The maximum duration to wait for a message.
//
Read(timeout time.Duration) (IMessage, error)
}
IMessageConsumer defines the interface for a message consumer, which is responsible for reading messages from a topic.
type IMessageProducer ¶ added in v1.2.2
type IMessageProducer interface {
// Closer includes the Close() method for releasing resources.
io.Closer
// Publish sends one or more messages to the producer's topic.
//
// Parameters:
// messages: A variadic slice of IMessage to be published.
//
// Returns:
// An error if publishing fails.
Publish(messages ...IMessage) error
}
IMessageProducer defines the interface for a message producer, which is responsible for publishing messages to a specific topic.
type InMemoryMessageBus ¶
type InMemoryMessageBus struct {
// contains filtered or unexported fields
}
InMemoryMessageBus represents an in-memory implementation of the IMessageBus interface. It simulates a message bus behavior within the application's memory, making it suitable for testing or simple use cases.
Fields:
mu: A RWMutex to ensure thread-safe access to topics and queues. topics: A map where keys are topic names and values are slices of channels. Each channel represents a subscriber. queues: A map where keys are queue names and values are Queue instances for message storage.
func (*InMemoryMessageBus) CloneMessageBus ¶ added in v1.2.62
func (m *InMemoryMessageBus) CloneMessageBus() (IMessageBus, error)
CloneMessageBus returns a clone (copy) of the message bus instance. In this in-memory implementation, it returns a pointer to the same instance, as the state is shared.
Returns:
A new IMessageBus instance that is a copy of the original. An error if the cloning process fails.
func (*InMemoryMessageBus) Close ¶ added in v1.2.1
func (m *InMemoryMessageBus) Close() error
Close releases any resources held by the message bus. For the in-memory implementation, it simply logs a debug message as there are no external connections to close.
Returns:
An error if closing fails, otherwise nil.
func (*InMemoryMessageBus) CreateConsumer ¶ added in v1.2.47
func (m *InMemoryMessageBus) CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)
CreateConsumer creates a message consumer for a specific topic.
Parameters:
subscription: A string identifying the subscription (not used in this implementation). mf: A MessageFactory to create message instances. topics: The topics to consume from. Only the first topic is used.
Returns:
An IMessageConsumer instance. An error if creation fails.
func (*InMemoryMessageBus) CreateProducer ¶ added in v1.2.2
func (m *InMemoryMessageBus) CreateProducer(topic string) (IMessageProducer, error)
CreateProducer creates a message producer for a specific topic. For the in-memory bus, the bus itself acts as the producer.
Parameters:
topic: The topic for which to create the producer.
Returns:
An IMessageProducer instance. An error if creation fails.
func (*InMemoryMessageBus) Ping ¶
func (m *InMemoryMessageBus) Ping(retries uint, intervalInSeconds uint) error
Ping tests the connectivity of the message bus. For the in-memory implementation, it always returns nil, indicating that the bus is always available.
Parameters:
retries: The number of times to retry the ping. (Not used in this implementation). intervalInSeconds: The interval in seconds between retries. (Not used in this implementation).
Returns:
An error if the ping fails, otherwise nil.
func (*InMemoryMessageBus) Pop ¶
func (m *InMemoryMessageBus) Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)
Pop removes and returns a message from one of the specified queues. It can operate in two modes: 1. If timeout is 0, it attempts to pop a message immediately. 2. If timeout is greater than 0, it blocks until a message is available or the timeout is reached.
Parameters:
mf: A MessageFactory function to create message instances (not used in this implementation but required by the interface). timeout: The duration to wait for a message before timing out. queue: A variadic slice of queue names to pop from.
Returns:
The popped IMessage. An error if no message is found, the timeout is reached, or another issue occurs.
func (*InMemoryMessageBus) Publish ¶
func (m *InMemoryMessageBus) Publish(messages ...IMessage) error
Publish sends one or more messages to their respective topics. It iterates through the provided messages, marshals them, and sends the data to all subscriber channels for that topic.
Parameters:
messages: A variadic slice of IMessage to be published.
Returns:
An error if marshalling or sending fails, otherwise nil.
func (*InMemoryMessageBus) Push ¶
func (m *InMemoryMessageBus) Push(messages ...IMessage) error
Push adds one or more messages to a queue. The queue is determined by the message's topic. If the queue does not exist, it is created.
Parameters:
messages: A variadic slice of IMessage to be pushed to the queue.
Returns:
An error if any issue occurs, otherwise nil.
func (*InMemoryMessageBus) Subscribe ¶
func (m *InMemoryMessageBus) Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (subscriptionId string, error error)
Subscribe creates a subscription to one or more topics. It sets up a channel for the subscriber and starts a goroutine to listen for messages on that channel. When a message is received, it's unmarshalled and passed to the provided callback function.
Parameters:
subscription: A string identifying the subscription (not used in this implementation). mf: A MessageFactory function to create new message instances for unmarshalling. callback: A SubscriptionCallback function to be executed when a message is received. topics: A variadic slice of topic names to subscribe to.
Returns:
A unique subscription ID. An error if the callback is nil or another issue occurs.
func (*InMemoryMessageBus) Unsubscribe ¶
func (m *InMemoryMessageBus) Unsubscribe(subscriptionId string) (success bool)
Unsubscribe removes a subscription. In this in-memory implementation, this is a no-op.
Parameters:
subscriptionId: The ID of the subscription to remove.
Returns:
A boolean indicating if the unsubscription was successful (always true).
type InMemoryMessageConsumer ¶ added in v1.2.47
type InMemoryMessageConsumer struct {
// contains filtered or unexported fields
}
InMemoryMessageConsumer represents a consumer for the in-memory message bus.
func (*InMemoryMessageConsumer) Close ¶ added in v1.2.47
func (m *InMemoryMessageConsumer) Close() error
Close releases resources used by the consumer. For the in-memory implementation, this is a no-op.
Returns:
An error if closing fails.
func (*InMemoryMessageConsumer) Read ¶ added in v1.2.47
func (m *InMemoryMessageConsumer) Read(timeout time.Duration) (IMessage, error)
Read retrieves a message from the consumer's topic. It blocks until a message is available or the specified timeout is reached.
Parameters:
timeout: The duration to wait for a message.
Returns:
The IMessage read from the topic. An error if the timeout is reached or another issue occurs.
type Message ¶ added in v1.2.42
type Message[T any] struct { BaseMessage MsgPayload T `json:"payload"` // The typed data payload of the message. }
Message is a generic message structure that embeds BaseMessage and adds a typed payload.
Type Parameters:
T: The type of the payload.
@Data
type MessageFactory ¶
type MessageFactory func() IMessage
MessageFactory is a function type that serves as a factory for creating IMessage instances. It is often used in consumers to generate the correct message type for unmarshalling.
type SubscriptionCallback ¶
SubscriptionCallback is a function type for handling messages received from a subscription. It takes an IMessage as input and returns a boolean indicating if the message was successfully processed (acknowledged).