messagex

package
v0.0.52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: MIT Imports: 19 Imported by: 3

Documentation

Index

Constants

View Source
const (
	Local    = iota
	RabbitMQ // RabbitMQ
	Redis    // Redis
)
View Source
const (
	MsgStartup = "messagex.startup"
)

Variables

This section is empty.

Functions

func HandleError

func HandleError(msg *Message, error *errors.Error)

func HandlePanic

func HandlePanic(msg *Message)

func Publish

func Publish(topic any, message *Message, brokerType ...BrokerType) (error *errors.Error)

func PublishNewMsg

func PublishNewMsg[T any](ctx context.Context, topic any, content T, groupId ...string)

func PublishWithCtx

func PublishWithCtx(ctx context.Context, topic any, message *Message) *errors.Error

func RegisterTopic

func RegisterTopic(topic any, handler func(message *Message) *errors.Error) (uint64, *errors.Error)

func RegisterTopicSeq added in v0.0.49

func RegisterTopicSeq(topic any, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)

func ReplayDLQ added in v0.0.49

func ReplayDLQ(topic any, limit int, brokerType ...BrokerType) *errors.Error

func ToMap

func ToMap(param interface{}) map[string]interface{}

ToMap converts a struct to a map[string]interface{} where the keys are the struct's field names and the values are the respective field values. Note: This function only works with structs and will return nil for non-struct parameters.

func UnSubscribe

func UnSubscribe(topic any, subID uint64) *errors.Error

Types

type BrokerType

type BrokerType int

type Message

type Message struct {
	Ctx     context.Context `json:"-"`
	ID      string
	GroupID string
	SubID   uint64
	Topic   string
	Retry   int
	Content map[string]interface{}
}

func (*Message) DeepCopy

func (m *Message) DeepCopy() *Message

func (*Message) GetValue

func (m *Message) GetValue(key string) interface{}

func (*Message) GetValueFloat64

func (m *Message) GetValueFloat64(key string) float64

func (*Message) GetValueInt64

func (m *Message) GetValueInt64(key string) int64

func (*Message) GetValueStr

func (m *Message) GetValueStr(key string) string

func (*Message) LowerContentKey

func (m *Message) LowerContentKey()

func (*Message) SetValue

func (m *Message) SetValue(key string, value interface{})

type MessageBroker

type MessageBroker interface {
	Subscribe(topic string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)
	SubscribeGroup(topic string, groupID string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)  // 指定groupID
	SubscribeSeq(topic string, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error) // 顺序消费,handler 同步执行
	ReplayDLQ(topic string, limit int) *errors.Error                                                                    // 将DLQ消息重新入队,limit<=0 表示全部
	UnSubscribe(topic string, subID uint64) *errors.Error
	Publish(topic string, message *Message) *errors.Error
	PublishGroup(topic string, groupID string, message *Message) *errors.Error // 指定groupID
	TopicList() []string
}

MessageBroker 定义了消息代理的行为。

type MessageService

type MessageService struct {
	BrokerType BrokerType
	Broker     MessageBroker
}

func GetDef

func GetDef() *MessageService

func Ins added in v0.0.47

func Ins(brokerType BrokerType) *MessageService

func (*MessageService) Publish added in v0.0.47

func (s *MessageService) Publish(ctx context.Context, topic any, message *Message) (error *errors.Error)

func (*MessageService) PublishNewMsg added in v0.0.47

func (s *MessageService) PublishNewMsg(ctx context.Context, topic any, content any, groupId ...string)

func (*MessageService) RegisterTopic added in v0.0.47

func (s *MessageService) RegisterTopic(topic any, handler func(message *Message) *errors.Error) (uint64, *errors.Error)

func (*MessageService) RegisterTopicSeq added in v0.0.49

func (s *MessageService) RegisterTopicSeq(topic any, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)

func (*MessageService) ReplayDLQ added in v0.0.49

func (s *MessageService) ReplayDLQ(topic any, limit int) *errors.Error

func (*MessageService) UnRegisterTopic added in v0.0.47

func (s *MessageService) UnRegisterTopic(topic any, subID uint64) *errors.Error

type MessageType

type MessageType string

type SeqOption added in v0.0.49

type SeqOption func(*seqConfig)

func WithDLQTopic added in v0.0.49

func WithDLQTopic(dlq string) SeqOption

func WithMaxRetry added in v0.0.49

func WithMaxRetry(max int) SeqOption

func WithRetryIntervals added in v0.0.49

func WithRetryIntervals(itv ...time.Duration) SeqOption

type SimpleMessageBroker

type SimpleMessageBroker struct {
	// contains filtered or unexported fields
}

func NewSimple

func NewSimple() *SimpleMessageBroker

func NewSimpleByType added in v0.0.47

func NewSimpleByType(brokerType BrokerType) *SimpleMessageBroker

func (*SimpleMessageBroker) Publish

func (mb *SimpleMessageBroker) Publish(topic string, message *Message) *errors.Error

func (*SimpleMessageBroker) PublishGroup

func (mb *SimpleMessageBroker) PublishGroup(topic string, groupID string, message *Message) *errors.Error

func (*SimpleMessageBroker) ReplayDLQ added in v0.0.49

func (mb *SimpleMessageBroker) ReplayDLQ(topic string, limit int) *errors.Error

func (*SimpleMessageBroker) StartListening

func (mb *SimpleMessageBroker) StartListening()

func (*SimpleMessageBroker) StartStoreListener added in v0.0.47

func (mb *SimpleMessageBroker) StartStoreListener(topic string)

func (*SimpleMessageBroker) Startup

func (mb *SimpleMessageBroker) Startup(topic string) *errors.Error

func (*SimpleMessageBroker) StopListening

func (mb *SimpleMessageBroker) StopListening()

func (*SimpleMessageBroker) Subscribe

func (mb *SimpleMessageBroker) Subscribe(topic string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)

func (*SimpleMessageBroker) SubscribeGroup

func (mb *SimpleMessageBroker) SubscribeGroup(topic string, groupID string, handler func(message *Message) *errors.Error) (uint64, *errors.Error)

func (*SimpleMessageBroker) SubscribeSeq added in v0.0.49

func (mb *SimpleMessageBroker) SubscribeSeq(topic string, handler func(message *Message) *errors.Error, opts ...SeqOption) (uint64, *errors.Error)

func (*SimpleMessageBroker) TopicList

func (mb *SimpleMessageBroker) TopicList() []string

func (*SimpleMessageBroker) UnSubscribe

func (mb *SimpleMessageBroker) UnSubscribe(topic string, subID uint64) *errors.Error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL