messaging

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 16 Imported by: 25

README

Messaging

messaging package defines Publisher, Subscriber and an aggregate Pubsub interface.

Subscriber interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ.

Publisher interface defines methods used to publish messages to a message broker such as MQTT or NATS or RabbitMQ.

Pubsub interface is composed of Publisher and Subscriber interface and can be used to send messages to as well as to receive messages from a message broker.

Documentation

Index

Constants

View Source
const (
	HealthTopicPrefix = "hc"
	StatusOK          = "ok"
)
View Source
const (
	MsgTopicPrefix     = 'm'
	ChannelTopicPrefix = 'c'
)

Variables

View Source
var (
	DefaultCacheConfig = CacheConfig{
		NumCounters: 2e5,
		MaxCost:     1 << 20,
		BufferItems: 64,
	}

	ErrMalformedTopic       = errors.New("malformed topic")
	ErrMalformedSubtopic    = errors.New("malformed subtopic")
	ErrEmptyRouteID         = errors.New("empty route or id")
	ErrFailedResolveDomain  = errors.New("failed to resolve domain route")
	ErrFailedResolveChannel = errors.New("failed to resolve channel route")
	ErrCreateCache          = errors.New("failed to create cache")
)
View Source
var File_pkg_messaging_message_proto protoreflect.FileDescriptor

Functions

func EncodeMessageMQTTTopic added in v0.17.0

func EncodeMessageMQTTTopic(m *Message) string

func EncodeMessageTopic added in v0.17.0

func EncodeMessageTopic(m *Message) string

func EncodeTopic added in v0.17.0

func EncodeTopic(domainID string, channelID string, subtopic string) string

func EncodeTopicSuffix added in v0.17.0

func EncodeTopicSuffix(domainID string, channelID string, subtopic string) string

func ParsePublishSubtopic added in v0.17.0

func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error)

func ParseSubscribeSubtopic added in v0.17.0

func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error)

Types

type AckType added in v0.17.0

type AckType int

AckType is used for message acknowledgement. It can be used for both successful and unsuccessful handling.

const (
	Ack        AckType = iota // regular acknowledgement
	DoubleAck                 // double ack in case of guaranteed delivery
	Nack                      // negative Ack
	InProgress                // restart delivery timer
	Term                      // terminate
	NoAck                     // do nothing
)

func (AckType) String added in v0.17.0

func (a AckType) String() string

type CacheConfig added in v0.17.0

type CacheConfig struct {
	NumCounters int64 `env:"NUM_COUNTERS" envDefault:"200000"`  // number of keys to track frequency of.
	MaxCost     int64 `env:"MAX_COST"     envDefault:"1048576"` // maximum cost of cache.
	BufferItems int64 `env:"BUFFER_ITEMS" envDefault:"64"`      // number of keys per Get buffer.
}

type DeliveryPolicy

type DeliveryPolicy uint8
const (
	// DeliverNewPolicy will only deliver new messages that are sent after the consumer is created.
	// This is the default policy.
	DeliverNewPolicy DeliveryPolicy = iota

	// DeliverAllPolicy starts delivering messages from the very beginning of a stream.
	DeliverAllPolicy
)

type Error added in v0.17.0

type Error interface {
	error
	Ack() AckType
}

MsgError is an error type for SuperMQ SDK.

func NewError added in v0.17.0

func NewError(err error, ack AckType) Error

NewError returns an Error setting the acknowledgement type.

type HealthInfo added in v0.17.0

type HealthInfo struct {
	// Status contains adapter status.
	Status string `json:"status"`

	// Protocol contains the protocol used.
	Protocol string `json:"protocol"`

	// Timestamp of health check.
	Timestamp time.Time `json:"timestamp"`
}

type Message

type Message struct {
	Channel   string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
	Domain    string `protobuf:"bytes,2,opt,name=domain,proto3" json:"domain,omitempty"`
	Subtopic  string `protobuf:"bytes,3,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
	Publisher string `protobuf:"bytes,4,opt,name=publisher,proto3" json:"publisher,omitempty"`
	Protocol  string `protobuf:"bytes,5,opt,name=protocol,proto3" json:"protocol,omitempty"`
	Payload   []byte `protobuf:"bytes,6,opt,name=payload,proto3" json:"payload,omitempty"`
	Created   int64  `protobuf:"varint,7,opt,name=created,proto3" json:"created,omitempty"` // Unix timestamp in nanoseconds
	// contains filtered or unexported fields
}

Message represents a message emitted by the SuperMQ adapters layer.

func (*Message) Descriptor deprecated

func (*Message) Descriptor() ([]byte, []int)

Deprecated: Use Message.ProtoReflect.Descriptor instead.

func (*Message) GetChannel

func (x *Message) GetChannel() string

func (*Message) GetCreated

func (x *Message) GetCreated() int64

func (*Message) GetDomain added in v0.17.0

func (x *Message) GetDomain() string

func (*Message) GetPayload

func (x *Message) GetPayload() []byte

func (*Message) GetProtocol

func (x *Message) GetProtocol() string

func (*Message) GetPublisher

func (x *Message) GetPublisher() string

func (*Message) GetSubtopic

func (x *Message) GetSubtopic() string

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) ProtoReflect

func (x *Message) ProtoReflect() protoreflect.Message

func (*Message) Reset

func (x *Message) Reset()

func (*Message) String

func (x *Message) String() string

type MessageHandler

type MessageHandler interface {
	// Handle handles messages passed by underlying implementation.
	Handle(msg *Message) error

	// Cancel is used for cleanup during unsubscribing and it's optional.
	Cancel() error
}

MessageHandler represents Message handler for Subscriber.

type Option

type Option func(vals any) error

Option represents optional configuration for message broker.

This is used to provide optional configuration parameters to the underlying publisher and pubsub implementation so that it can be configured to meet the specific needs.

For example, it can be used to set the message prefix so that brokers can be used for event sourcing as well as internal message broker. Using value of type interface is not recommended but is the most suitable for this use case as options should be compiled with respect to the underlying broker which can either be RabbitMQ or NATS.

The example below shows how to set the prefix and jetstream stream for NATS.

Example:

broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js))

type PubSub

type PubSub interface {
	Publisher
	Subscriber
}

PubSub represents aggregation interface for publisher and subscriber.

type Publisher

type Publisher interface {
	// Publishes message to the stream.
	Publish(ctx context.Context, topic string, msg *Message) error

	// Close gracefully closes message publisher's connection.
	Close() error
}

Publisher specifies message publishing API.

type Subscriber

type Subscriber interface {
	// Subscribe subscribes to the message stream and consumes messages.
	Subscribe(ctx context.Context, cfg SubscriberConfig) error

	// Unsubscribe unsubscribes from the message stream and
	// stops consuming messages.
	Unsubscribe(ctx context.Context, id, topic string) error

	// Close gracefully closes message subscriber's connection.
	Close() error
}

Subscriber specifies message subscription API.

type SubscriberConfig

type SubscriberConfig struct {
	ID             string         // Unique identifier for the subscriber.
	ClientID       string         // Identifier of the client associated with this subscriber.
	Topic          string         // Topic to subscribe to.
	Handler        MessageHandler // Function that handles incoming messages.
	DeliveryPolicy DeliveryPolicy // DeliverPolicy defines from which point to start delivering messages.
	Ordered        bool           // Whether message delivery must preserve order.
}

SubscriberConfig defines the configuration for a subscriber that processes messages from a topic.

type TopicParser added in v0.17.0

type TopicParser interface {
	ParsePublishTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, topicType TopicType, err error)
	ParseSubscribeTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, topicType TopicType, err error)
}

TopicParser defines methods for parsing publish and subscribe topics. It uses a cache to store parsed topics for quick retrieval. It also resolves domain and channel IDs if requested.

func NewTopicParser added in v0.17.0

NewTopicParser creates a new instance of TopicParser.

type TopicResolver added in v0.17.0

type TopicResolver interface {
	Resolve(ctx context.Context, domain, channel string) (domainID string, channelID string, isRoute bool, err error)
	ResolveTopic(ctx context.Context, topic string) (rtopic string, err error)
}

TopicResolver contains definitions for resolving domain and channel IDs from their respective routes from the message topic.

func NewTopicResolver added in v0.17.0

func NewTopicResolver(channelsClient grpcChannelsV1.ChannelsServiceClient, domainsClient grpcDomainsV1.DomainsServiceClient) TopicResolver

NewTopicResolver creates a new instance of TopicResolver.

type TopicType added in v0.18.2

type TopicType uint8
const (
	InvalidType TopicType = iota
	MessageType
	HealthType
)

func ParsePublishTopic added in v0.17.0

func ParsePublishTopic(topic string) (domainID, chanID, subtopic string, topicType TopicType, err error)

func ParseSubscribeTopic added in v0.17.0

func ParseSubscribeTopic(topic string) (domainID string, chanID string, subtopic string, topicType TopicType, err error)

func ParseTopic added in v0.17.0

func ParseTopic(topic string) (domainID, chanID, subtopic string, topicType TopicType, err error)

ParseTopic parses a messaging topic string and returns the domain ID, channel ID, and subtopic. Supported formats (leading '/' optional):

m/<domain_id>/c/<channel_id>[/<subtopic>]
hc/<domain_id>

This is an optimized version with no regex and minimal allocations.

Directories

Path Synopsis
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the SuperMQ IoT platform.
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the SuperMQ IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the SuperMQ IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the SuperMQ IoT platform.
tracing
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the SuperMQ IoT platform.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the SuperMQ IoT platform.
tracing
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
Package tracing provides tracing instrumentation for SuperMQ clients policies service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL