messaging

package
v1.2.183 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 7 Imported by: 12

Documentation

Index

Constants

View Source
const EntityMessageTopic = "ENTITY"

EntityMessageTopic is a predefined topic name for messages that carry entities.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseMessage

type BaseMessage struct {
	MsgTopic     string `json:"topic"`     // The topic (channel or queue) of the message.
	MsgOpCode    int    `json:"opCode"`    // An operational code for the message.
	MsgVersion   string `json:"version"`   // The version of the message.
	MsgAddressee string `json:"addressee"` // The final recipient of the message.
	MsgSessionId string `json:"sessionId"` // A session ID for tracking related messages.
}

BaseMessage provides a basic implementation of the IMessage interface. It can be embedded in other message structs to provide default behavior for the common message attributes. @Data

func (*BaseMessage) Addressee

func (m *BaseMessage) Addressee() string

Addressee returns the message's final addressee.

func (*BaseMessage) OpCode

func (m *BaseMessage) OpCode() int

OpCode returns the message's operational code.

func (*BaseMessage) Payload

func (m *BaseMessage) Payload() any

Payload returns nil, as the BaseMessage does not carry a payload itself. This method is intended to be overridden by embedding structs.

func (*BaseMessage) SessionId

func (m *BaseMessage) SessionId() string

SessionId returns the message's session ID.

func (*BaseMessage) Topic

func (m *BaseMessage) Topic() string

Topic returns the message's topic.

func (*BaseMessage) Version added in v1.2.52

func (m *BaseMessage) Version() string

Version returns the message's version.

type EntityMessage added in v1.2.43

type EntityMessage struct {
	BaseMessage
	MsgPayload any `json:"payload"` // The payload of the message, typically an entity.
}

EntityMessage is a message structure for carrying generic entity data. It embeds BaseMessage and uses an `any` type for the payload. @Data

func (*EntityMessage) Payload added in v1.2.43

func (m *EntityMessage) Payload() any

Payload returns the message's payload.

type IMessage

type IMessage interface {
	// Topic returns the name of the topic, channel, or queue the message is associated with.
	Topic() string

	// OpCode returns the message's operational code, which can be used to indicate the type or purpose of the message.
	OpCode() int

	// Addressee returns the final recipient of the message. This is an optional field.
	Addressee() string

	// SessionId returns an identifier for a message exchange session. This ID is shared across all messages
	// belonging to the same session, allowing for conversational message patterns.
	SessionId() string

	// Version returns the version of the message, which can be used for compatibility and evolution of message formats.
	Version() string

	// Payload returns the body of the message, which can be of any type.
	Payload() any
}

IMessage defines the interface for a standard message used within the messaging system. It provides a structured way to define messages with common attributes like topic, operation code, and payload.

func GetMessage added in v1.2.57

func GetMessage[T any](topic string, payload T) IMessage

GetMessage creates and initializes a new generic Message with a given topic and payload. It automatically assigns a new session ID.

Type Parameters:

T: The type of the payload.

Parameters:

topic: The topic for the message.
payload: The payload for the message.

Returns:

A new, initialized IMessage.

func NewEntityMessage added in v1.2.44

func NewEntityMessage() IMessage

NewEntityMessage is a factory function that creates a new instance of EntityMessage.

Returns:

A new IMessage of type EntityMessage.

func NewMessage added in v1.2.57

func NewMessage[T any]() IMessage

NewMessage creates a new instance of a generic Message.

Type Parameters:

T: The type of the payload.

Returns:

A new IMessage with a typed payload.

type IMessageBus

type IMessageBus interface {
	// Closer includes the Close() method for releasing resources.
	io.Closer

	// Ping tests the connectivity to the message bus.
	// It attempts to connect a specified number of times with a given interval.
	//
	// Parameters:
	//   retries: The number of times to retry connecting.
	//   intervalInSeconds: The time in seconds to wait between retries.
	//
	// Returns:
	//   An error if the connection cannot be established, otherwise nil.
	Ping(retries uint, intervalInSeconds uint) error

	// CloneMessageBus creates and returns a new instance of the message bus,
	// effectively cloning the current configuration and state.
	//
	// Returns:
	//   A new IMessageBus instance.
	//   An error if the cloning process fails.
	CloneMessageBus() (IMessageBus, error)

	// Publish sends one or more messages to a topic. This is part of the publish-subscribe pattern,
	// where messages are broadcast to all subscribers of a topic.
	//
	// Parameters:
	//   messages: A variadic slice of IMessage to be published.
	//
	// Returns:
	//   An error if publishing fails.
	Publish(messages ...IMessage) error

	// Subscribe creates a subscription to one or more topics, allowing a consumer to receive messages.
	//
	// Parameters:
	//   subscription: A name for the subscription, which can be used for durable subscriptions.
	//   mf: A MessageFactory function to create new message instances for unmarshalling.
	//   callback: The function to be called when a message is received.
	//   topics: A variadic slice of topic names to subscribe to.
	//
	// Returns:
	//   A unique string identifying the subscription.
	//   An error if the subscription fails.
	Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (string, error)

	// Unsubscribe removes a subscription, stopping the flow of messages to it.
	//
	// Parameters:
	//   subscriptionId: The ID of the subscription to be removed.
	//
	// Returns:
	//   A boolean indicating whether the unsubscription was successful.
	Unsubscribe(subscriptionId string) bool

	// Push adds one or more messages to a queue. This is part of the queueing pattern,
	// where messages are processed by one of the competing consumers.
	//
	// Parameters:
	//   messages: A variadic slice of IMessage to be added to the queue.
	//
	// Returns:
	//   An error if the push operation fails.
	Push(messages ...IMessage) error

	// Pop removes and returns a message from a queue. It can block until a message is available
	// or a timeout occurs.
	//
	// Parameters:
	//   mf: A MessageFactory function to create a new message instance for unmarshalling.
	//   timeout: The maximum time to wait for a message.
	//   queue: A variadic slice of queue names to pop from.
	//
	// Returns:
	//   The IMessage that was popped from the queue.
	//   An error if the operation times out or fails.
	Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)

	// CreateProducer creates a message producer for a specific topic. A producer is responsible for
	// sending messages.
	//
	// Parameters:
	//   topic: The name of the topic for which to create the producer.
	//
	// Returns:
	//   An IMessageProducer instance.
	//   An error if the creation fails.
	CreateProducer(topic string) (IMessageProducer, error)

	// CreateConsumer creates a message consumer for one or more topics. A consumer is responsible for
	// receiving messages.
	//
	// Parameters:
	//   subscription: A name for the subscription, useful for durable consumers.
	//   mf: A MessageFactory function to create new message instances.
	//   topics: The topics to consume messages from.
	//
	// Returns:
	//   An IMessageConsumer instance.
	//   An error if the creation fails.
	CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)
}

IMessageBus defines the interface for a message bus, which provides a unified way to handle messaging through various implementations (e.g., in-memory, RabbitMQ, NATS). It supports both publish-subscribe and queueing patterns.

func NewInMemoryMessageBus

func NewInMemoryMessageBus() (mq IMessageBus, err error)

NewInMemoryMessageBus is a factory method that creates and returns a new instance of InMemoryMessageBus. It initializes the topics and queues maps.

type IMessageConsumer added in v1.2.47

type IMessageConsumer interface {
	// Closer includes the Close() method for releasing resources.
	io.Closer

	// Read retrieves a message from the topic, blocking until a new message arrives or a timeout is reached.
	// A timeout of 0 can be used to block indefinitely.
	//
	// A typical usage pattern is an infinite loop:
	//
	//  for {
	//      if msg, err := consumer.Read(time.Second * 5); err != nil {
	//          // Handle error, e.g., log it or break the loop
	//      } else {
	//          // Process the message, often in a separate goroutine
	//          go processThisMessage(msg)
	//      }
	//  }
	//
	// Parameters:
	//   timeout: The maximum duration to wait for a message.
	//
	Read(timeout time.Duration) (IMessage, error)
}

IMessageConsumer defines the interface for a message consumer, which is responsible for reading messages from a topic.

type IMessageProducer added in v1.2.2

type IMessageProducer interface {
	// Closer includes the Close() method for releasing resources.
	io.Closer

	// Publish sends one or more messages to the producer's topic.
	//
	// Parameters:
	//   messages: A variadic slice of IMessage to be published.
	//
	// Returns:
	//   An error if publishing fails.
	Publish(messages ...IMessage) error
}

IMessageProducer defines the interface for a message producer, which is responsible for publishing messages to a specific topic.

type InMemoryMessageBus

type InMemoryMessageBus struct {
	// contains filtered or unexported fields
}

InMemoryMessageBus represents an in-memory implementation of the IMessageBus interface. It simulates a message bus behavior within the application's memory, making it suitable for testing or simple use cases.

Fields:

mu: A RWMutex to ensure thread-safe access to topics and queues.
topics: A map where keys are topic names and values are slices of channels. Each channel represents a subscriber.
queues: A map where keys are queue names and values are Queue instances for message storage.

func (*InMemoryMessageBus) CloneMessageBus added in v1.2.62

func (m *InMemoryMessageBus) CloneMessageBus() (IMessageBus, error)

CloneMessageBus returns a clone (copy) of the message bus instance. In this in-memory implementation, it returns a pointer to the same instance, as the state is shared.

Returns:

A new IMessageBus instance that is a copy of the original.
An error if the cloning process fails.

func (*InMemoryMessageBus) Close added in v1.2.1

func (m *InMemoryMessageBus) Close() error

Close releases any resources held by the message bus. For the in-memory implementation, it simply logs a debug message as there are no external connections to close.

Returns:

An error if closing fails, otherwise nil.

func (*InMemoryMessageBus) CreateConsumer added in v1.2.47

func (m *InMemoryMessageBus) CreateConsumer(subscription string, mf MessageFactory, topics ...string) (IMessageConsumer, error)

CreateConsumer creates a message consumer for a specific topic.

Parameters:

subscription: A string identifying the subscription (not used in this implementation).
mf: A MessageFactory to create message instances.
topics: The topics to consume from. Only the first topic is used.

Returns:

An IMessageConsumer instance.
An error if creation fails.

func (*InMemoryMessageBus) CreateProducer added in v1.2.2

func (m *InMemoryMessageBus) CreateProducer(topic string) (IMessageProducer, error)

CreateProducer creates a message producer for a specific topic. For the in-memory bus, the bus itself acts as the producer.

Parameters:

topic: The topic for which to create the producer.

Returns:

An IMessageProducer instance.
An error if creation fails.

func (*InMemoryMessageBus) Ping

func (m *InMemoryMessageBus) Ping(retries uint, intervalInSeconds uint) error

Ping tests the connectivity of the message bus. For the in-memory implementation, it always returns nil, indicating that the bus is always available.

Parameters:

retries: The number of times to retry the ping. (Not used in this implementation).
intervalInSeconds: The interval in seconds between retries. (Not used in this implementation).

Returns:

An error if the ping fails, otherwise nil.

func (*InMemoryMessageBus) Pop

func (m *InMemoryMessageBus) Pop(mf MessageFactory, timeout time.Duration, queue ...string) (IMessage, error)

Pop removes and returns a message from one of the specified queues. It can operate in two modes: 1. If timeout is 0, it attempts to pop a message immediately. 2. If timeout is greater than 0, it blocks until a message is available or the timeout is reached.

Parameters:

mf: A MessageFactory function to create message instances (not used in this implementation but required by the interface).
timeout: The duration to wait for a message before timing out.
queue: A variadic slice of queue names to pop from.

Returns:

The popped IMessage.
An error if no message is found, the timeout is reached, or another issue occurs.

func (*InMemoryMessageBus) Publish

func (m *InMemoryMessageBus) Publish(messages ...IMessage) error

Publish sends one or more messages to their respective topics. It iterates through the provided messages, marshals them, and sends the data to all subscriber channels for that topic.

Parameters:

messages: A variadic slice of IMessage to be published.

Returns:

An error if marshalling or sending fails, otherwise nil.

func (*InMemoryMessageBus) Push

func (m *InMemoryMessageBus) Push(messages ...IMessage) error

Push adds one or more messages to a queue. The queue is determined by the message's topic. If the queue does not exist, it is created.

Parameters:

messages: A variadic slice of IMessage to be pushed to the queue.

Returns:

An error if any issue occurs, otherwise nil.

func (*InMemoryMessageBus) Subscribe

func (m *InMemoryMessageBus) Subscribe(subscription string, mf MessageFactory, callback SubscriptionCallback, topics ...string) (subscriptionId string, error error)

Subscribe creates a subscription to one or more topics. It sets up a channel for the subscriber and starts a goroutine to listen for messages on that channel. When a message is received, it's unmarshalled and passed to the provided callback function.

Parameters:

subscription: A string identifying the subscription (not used in this implementation).
mf: A MessageFactory function to create new message instances for unmarshalling.
callback: A SubscriptionCallback function to be executed when a message is received.
topics: A variadic slice of topic names to subscribe to.

Returns:

A unique subscription ID.
An error if the callback is nil or another issue occurs.

func (*InMemoryMessageBus) Unsubscribe

func (m *InMemoryMessageBus) Unsubscribe(subscriptionId string) (success bool)

Unsubscribe removes a subscription. In this in-memory implementation, this is a no-op.

Parameters:

subscriptionId: The ID of the subscription to remove.

Returns:

A boolean indicating if the unsubscription was successful (always true).

type InMemoryMessageConsumer added in v1.2.47

type InMemoryMessageConsumer struct {
	// contains filtered or unexported fields
}

InMemoryMessageConsumer represents a consumer for the in-memory message bus.

func (*InMemoryMessageConsumer) Close added in v1.2.47

func (m *InMemoryMessageConsumer) Close() error

Close releases resources used by the consumer. For the in-memory implementation, this is a no-op.

Returns:

An error if closing fails.

func (*InMemoryMessageConsumer) Read added in v1.2.47

func (m *InMemoryMessageConsumer) Read(timeout time.Duration) (IMessage, error)

Read retrieves a message from the consumer's topic. It blocks until a message is available or the specified timeout is reached.

Parameters:

timeout: The duration to wait for a message.

Returns:

The IMessage read from the topic.
An error if the timeout is reached or another issue occurs.

type Message added in v1.2.42

type Message[T any] struct {
	BaseMessage
	MsgPayload T `json:"payload"` // The typed data payload of the message.
}

Message is a generic message structure that embeds BaseMessage and adds a typed payload.

Type Parameters:

T: The type of the payload.

@Data

type MessageFactory

type MessageFactory func() IMessage

MessageFactory is a function type that serves as a factory for creating IMessage instances. It is often used in consumers to generate the correct message type for unmarshalling.

type SubscriptionCallback

type SubscriptionCallback func(msg IMessage) bool

SubscriptionCallback is a function type for handling messages received from a subscription. It takes an IMessage as input and returns a boolean indicating if the message was successfully processed (acknowledged).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL