Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeMessageMQTTTopic(m *Message) string
- func EncodeMessageTopic(m *Message) string
- func EncodeTopic(domainID string, channelID string, subtopic string) string
- func EncodeTopicSuffix(domainID string, channelID string, subtopic string) string
- func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error)
- func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error)
- type AckType
- type CacheConfig
- type DeliveryPolicy
- type Error
- type HealthInfo
- type Message
- func (*Message) Descriptor() ([]byte, []int)deprecated
- func (x *Message) GetChannel() string
- func (x *Message) GetCreated() int64
- func (x *Message) GetDomain() string
- func (x *Message) GetPayload() []byte
- func (x *Message) GetProtocol() string
- func (x *Message) GetPublisher() string
- func (x *Message) GetSubtopic() string
- func (*Message) ProtoMessage()
- func (x *Message) ProtoReflect() protoreflect.Message
- func (x *Message) Reset()
- func (x *Message) String() string
- type MessageHandler
- type Option
- type PubSub
- type Publisher
- type Subscriber
- type SubscriberConfig
- type TopicParser
- type TopicResolver
- type TopicType
- func ParsePublishTopic(topic string) (domainID, chanID, subtopic string, topicType TopicType, err error)
- func ParseSubscribeTopic(topic string) (domainID string, chanID string, subtopic string, topicType TopicType, ...)
- func ParseTopic(topic string) (domainID, chanID, subtopic string, topicType TopicType, err error)
Constants ¶
const ( HealthTopicPrefix = "hc" StatusOK = "ok" )
const ( MsgTopicPrefix = 'm' ChannelTopicPrefix = 'c' )
Variables ¶
var ( DefaultCacheConfig = CacheConfig{ NumCounters: 2e5, MaxCost: 1 << 20, BufferItems: 64, } ErrMalformedTopic = errors.New("malformed topic") ErrMalformedSubtopic = errors.New("malformed subtopic") ErrEmptyRouteID = errors.New("empty route or id") ErrFailedResolveDomain = errors.New("failed to resolve domain route") ErrFailedResolveChannel = errors.New("failed to resolve channel route") ErrCreateCache = errors.New("failed to create cache") )
var File_pkg_messaging_message_proto protoreflect.FileDescriptor
Functions ¶
func EncodeMessageMQTTTopic ¶ added in v0.17.0
func EncodeMessageTopic ¶ added in v0.17.0
func EncodeTopic ¶ added in v0.17.0
func EncodeTopicSuffix ¶ added in v0.17.0
func ParsePublishSubtopic ¶ added in v0.17.0
func ParseSubscribeSubtopic ¶ added in v0.17.0
Types ¶
type AckType ¶ added in v0.17.0
type AckType int
AckType is used for message acknowledgement. It can be used for both successful and unsuccessful handling.
type CacheConfig ¶ added in v0.17.0
type DeliveryPolicy ¶
type DeliveryPolicy uint8
const ( // DeliverNewPolicy will only deliver new messages that are sent after the consumer is created. // This is the default policy. DeliverNewPolicy DeliveryPolicy = iota // DeliverAllPolicy starts delivering messages from the very beginning of a stream. DeliverAllPolicy )
type HealthInfo ¶ added in v0.17.0
type Message ¶
type Message struct {
Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
Domain string `protobuf:"bytes,2,opt,name=domain,proto3" json:"domain,omitempty"`
Subtopic string `protobuf:"bytes,3,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
Publisher string `protobuf:"bytes,4,opt,name=publisher,proto3" json:"publisher,omitempty"`
Protocol string `protobuf:"bytes,5,opt,name=protocol,proto3" json:"protocol,omitempty"`
Payload []byte `protobuf:"bytes,6,opt,name=payload,proto3" json:"payload,omitempty"`
Created int64 `protobuf:"varint,7,opt,name=created,proto3" json:"created,omitempty"` // Unix timestamp in nanoseconds
// contains filtered or unexported fields
}
Message represents a message emitted by the SuperMQ adapters layer.
func (*Message) Descriptor
deprecated
func (*Message) GetChannel ¶
func (*Message) GetCreated ¶
func (*Message) GetPayload ¶
func (*Message) GetProtocol ¶
func (*Message) GetPublisher ¶
func (*Message) GetSubtopic ¶
func (*Message) ProtoMessage ¶
func (*Message) ProtoMessage()
func (*Message) ProtoReflect ¶
func (x *Message) ProtoReflect() protoreflect.Message
type MessageHandler ¶
type MessageHandler interface {
// Handle handles messages passed by underlying implementation.
Handle(msg *Message) error
// Cancel is used for cleanup during unsubscribing and it's optional.
Cancel() error
}
MessageHandler represents Message handler for Subscriber.
type Option ¶
Option represents optional configuration for message broker.
This is used to provide optional configuration parameters to the underlying publisher and pubsub implementation so that it can be configured to meet the specific needs.
For example, it can be used to set the message prefix so that brokers can be used for event sourcing as well as internal message broker. Using value of type interface is not recommended but is the most suitable for this use case as options should be compiled with respect to the underlying broker which can either be RabbitMQ or NATS.
The example below shows how to set the prefix and jetstream stream for NATS.
Example:
broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js))
type PubSub ¶
type PubSub interface {
Publisher
Subscriber
}
PubSub represents aggregation interface for publisher and subscriber.
type Publisher ¶
type Publisher interface {
// Publishes message to the stream.
Publish(ctx context.Context, topic string, msg *Message) error
// Close gracefully closes message publisher's connection.
Close() error
}
Publisher specifies message publishing API.
type Subscriber ¶
type Subscriber interface {
// Subscribe subscribes to the message stream and consumes messages.
Subscribe(ctx context.Context, cfg SubscriberConfig) error
// Unsubscribe unsubscribes from the message stream and
// stops consuming messages.
Unsubscribe(ctx context.Context, id, topic string) error
// Close gracefully closes message subscriber's connection.
Close() error
}
Subscriber specifies message subscription API.
type SubscriberConfig ¶
type SubscriberConfig struct {
ID string // Unique identifier for the subscriber.
ClientID string // Identifier of the client associated with this subscriber.
Topic string // Topic to subscribe to.
Handler MessageHandler // Function that handles incoming messages.
DeliveryPolicy DeliveryPolicy // DeliverPolicy defines from which point to start delivering messages.
Ordered bool // Whether message delivery must preserve order.
}
SubscriberConfig defines the configuration for a subscriber that processes messages from a topic.
type TopicParser ¶ added in v0.17.0
type TopicParser interface {
ParsePublishTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, topicType TopicType, err error)
ParseSubscribeTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, topicType TopicType, err error)
}
TopicParser defines methods for parsing publish and subscribe topics. It uses a cache to store parsed topics for quick retrieval. It also resolves domain and channel IDs if requested.
func NewTopicParser ¶ added in v0.17.0
func NewTopicParser(cfg CacheConfig, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (TopicParser, error)
NewTopicParser creates a new instance of TopicParser.
type TopicResolver ¶ added in v0.17.0
type TopicResolver interface {
Resolve(ctx context.Context, domain, channel string) (domainID string, channelID string, isRoute bool, err error)
ResolveTopic(ctx context.Context, topic string) (rtopic string, err error)
}
TopicResolver contains definitions for resolving domain and channel IDs from their respective routes from the message topic.
func NewTopicResolver ¶ added in v0.17.0
func NewTopicResolver(channelsClient grpcChannelsV1.ChannelsServiceClient, domainsClient grpcDomainsV1.DomainsServiceClient) TopicResolver
NewTopicResolver creates a new instance of TopicResolver.
type TopicType ¶ added in v0.18.2
type TopicType uint8
func ParsePublishTopic ¶ added in v0.17.0
func ParseSubscribeTopic ¶ added in v0.17.0
func ParseTopic ¶ added in v0.17.0
ParseTopic parses a messaging topic string and returns the domain ID, channel ID, and subtopic. Supported formats (leading '/' optional):
m/<domain_id>/c/<channel_id>[/<subtopic>] hc/<domain_id>
This is an optimized version with no regex and minimal allocations.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the SuperMQ IoT platform.
|
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the SuperMQ IoT platform. |
|
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the SuperMQ IoT platform.
|
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the SuperMQ IoT platform. |
|
tracing
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
|
Package tracing provides tracing instrumentation for SuperMQ clients policies service. |
|
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the SuperMQ IoT platform.
|
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the SuperMQ IoT platform. |
|
tracing
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
|
Package tracing provides tracing instrumentation for SuperMQ clients policies service. |
|
Package tracing provides tracing instrumentation for SuperMQ clients policies service.
|
Package tracing provides tracing instrumentation for SuperMQ clients policies service. |