Documentation
¶
Index ¶
- Constants
- func HandleError(msg *Message, error *errors.Error)
- func HandlePanic(msg *Message)
- func Publish(topic any, message *Message, brokerType ...BrokerType) (error *errors.Error)
- func PublishNewMsg[T any](ctx context.Context, topic any, content T, groupId ...string)
- func PublishWithCtx(ctx context.Context, topic any, message *Message) *errors.Error
- func RegisterTopic(topic any, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
- func RegisterTopicSeq(topic any, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)
- func ReplayDLQ(topic any, limit int, brokerType ...BrokerType) *errors.Error
- func ToMap(param interface{}) map[string]interface{}
- func UnSubscribe(topic any, subID uint64) *errors.Error
- type BrokerType
- type Message
- func (m *Message) DeepCopy() *Message
- func (m *Message) GetValue(key string) interface{}
- func (m *Message) GetValueFloat64(key string) float64
- func (m *Message) GetValueInt64(key string) int64
- func (m *Message) GetValueStr(key string) string
- func (m *Message) LowerContentKey()
- func (m *Message) SetValue(key string, value interface{})
- type MessageBroker
- type MessageService
- func (s *MessageService) Publish(ctx context.Context, topic any, message *Message) (error *errors.Error)
- func (s *MessageService) PublishNewMsg(ctx context.Context, topic any, content any, groupId ...string)
- func (s *MessageService) RegisterTopic(topic any, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
- func (s *MessageService) RegisterTopicSeq(topic any, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)
- func (s *MessageService) ReplayDLQ(topic any, limit int) *errors.Error
- func (s *MessageService) UnRegisterTopic(topic any, subID uint64) *errors.Error
- type MessageType
- type SeqOption
- type SimpleMessageBroker
- func (mb *SimpleMessageBroker) Publish(topic string, message *Message) *errors.Error
- func (mb *SimpleMessageBroker) PublishGroup(topic string, groupID string, message *Message) *errors.Error
- func (mb *SimpleMessageBroker) ReplayDLQ(topic string, limit int) *errors.Error
- func (mb *SimpleMessageBroker) StartListening()
- func (mb *SimpleMessageBroker) StartStoreListener(topic string)
- func (mb *SimpleMessageBroker) Startup(topic string) *errors.Error
- func (mb *SimpleMessageBroker) StopListening()
- func (mb *SimpleMessageBroker) Subscribe(topic string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
- func (mb *SimpleMessageBroker) SubscribeGroup(topic string, groupID string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
- func (mb *SimpleMessageBroker) SubscribeSeq(topic string, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)
- func (mb *SimpleMessageBroker) TopicList() []string
- func (mb *SimpleMessageBroker) UnSubscribe(topic string, subID uint64) *errors.Error
Constants ¶
View Source
const ( Local = iota RabbitMQ // RabbitMQ Redis // Redis )
View Source
const (
MsgStartup = "messagex.startup"
)
Variables ¶
This section is empty.
Functions ¶
func HandleError ¶
func HandlePanic ¶
func HandlePanic(msg *Message)
func Publish ¶
func Publish(topic any, message *Message, brokerType ...BrokerType) (error *errors.Error)
func PublishNewMsg ¶
func PublishWithCtx ¶
func RegisterTopic ¶
func RegisterTopicSeq ¶ added in v0.0.49
func ReplayDLQ ¶ added in v0.0.49
func ReplayDLQ(topic any, limit int, brokerType ...BrokerType) *errors.Error
func ToMap ¶
func ToMap(param interface{}) map[string]interface{}
ToMap converts a struct to a map[string]interface{} where the keys are the struct's field names and the values are the respective field values. Note: This function only works with structs and will return nil for non-struct parameters.
Types ¶
type BrokerType ¶
type BrokerType int
type Message ¶
type Message struct {
Ctx context.Context `json:"-"`
ID string
GroupID string
SubID uint64
Topic string
Retry int
Content map[string]interface{}
}
func (*Message) GetValueFloat64 ¶
func (*Message) GetValueInt64 ¶
func (*Message) GetValueStr ¶
func (*Message) LowerContentKey ¶
func (m *Message) LowerContentKey()
type MessageBroker ¶
type MessageBroker interface {
Subscribe(topic string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
SubscribeGroup(topic string, groupID string, handler func(message *Message) *errors.Error) (uint64, *errors.Error) // 指定groupID
SubscribeSeq(topic string, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error) // 顺序消费,handler 同步执行
ReplayDLQ(topic string, limit int) *errors.Error // 将DLQ消息重新入队,limit<=0 表示全部
UnSubscribe(topic string, subID uint64) *errors.Error
Publish(topic string, message *Message) *errors.Error
PublishGroup(topic string, groupID string, message *Message) *errors.Error // 指定groupID
TopicList() []string
}
MessageBroker 定义了消息代理的行为。
type MessageService ¶
type MessageService struct {
BrokerType BrokerType
Broker MessageBroker
}
func GetDef ¶
func GetDef() *MessageService
func Ins ¶ added in v0.0.47
func Ins(brokerType BrokerType) *MessageService
func (*MessageService) PublishNewMsg ¶ added in v0.0.47
func (*MessageService) RegisterTopic ¶ added in v0.0.47
func (*MessageService) RegisterTopicSeq ¶ added in v0.0.49
func (*MessageService) ReplayDLQ ¶ added in v0.0.49
func (s *MessageService) ReplayDLQ(topic any, limit int) *errors.Error
func (*MessageService) UnRegisterTopic ¶ added in v0.0.47
func (s *MessageService) UnRegisterTopic(topic any, subID uint64) *errors.Error
type MessageType ¶
type MessageType string
type SeqOption ¶ added in v0.0.49
type SeqOption func(*seqConfig)
func WithDLQTopic ¶ added in v0.0.49
func WithMaxRetry ¶ added in v0.0.49
func WithRetryIntervals ¶ added in v0.0.49
type SimpleMessageBroker ¶
type SimpleMessageBroker struct {
// contains filtered or unexported fields
}
func NewSimple ¶
func NewSimple() *SimpleMessageBroker
func NewSimpleByType ¶ added in v0.0.47
func NewSimpleByType(brokerType BrokerType) *SimpleMessageBroker
func (*SimpleMessageBroker) Publish ¶
func (mb *SimpleMessageBroker) Publish(topic string, message *Message) *errors.Error
func (*SimpleMessageBroker) PublishGroup ¶
func (*SimpleMessageBroker) ReplayDLQ ¶ added in v0.0.49
func (mb *SimpleMessageBroker) ReplayDLQ(topic string, limit int) *errors.Error
func (*SimpleMessageBroker) StartListening ¶
func (mb *SimpleMessageBroker) StartListening()
func (*SimpleMessageBroker) StartStoreListener ¶ added in v0.0.47
func (mb *SimpleMessageBroker) StartStoreListener(topic string)
func (*SimpleMessageBroker) Startup ¶
func (mb *SimpleMessageBroker) Startup(topic string) *errors.Error
func (*SimpleMessageBroker) StopListening ¶
func (mb *SimpleMessageBroker) StopListening()
func (*SimpleMessageBroker) SubscribeGroup ¶
func (*SimpleMessageBroker) SubscribeSeq ¶ added in v0.0.49
func (*SimpleMessageBroker) TopicList ¶
func (mb *SimpleMessageBroker) TopicList() []string
func (*SimpleMessageBroker) UnSubscribe ¶
func (mb *SimpleMessageBroker) UnSubscribe(topic string, subID uint64) *errors.Error
Click to show internal directories.
Click to hide internal directories.