Documentation
¶
Index ¶
- Constants
- func AppName(name string) utils.OptionFunc[useOption]
- func Async(strategies ...strategy.Strategy) utils.OptionFunc[pubOption]
- func ChannelLen(channelLength int) utils.OptionFunc[subOption]
- func Construct(ctx context.Context, confs map[string]*Conf, opts ...utils.OptionExtender) func()
- func EventHandler[T eventual](hdr eventHandler[T]) eventHandler[T]
- func EventHandlerWithMsg[T eventual](hdr eventHandlerWithMsg[T]) eventHandlerWithMsg[T]
- func Events[T eventual](events ...Event[T]) utils.OptionFunc[eventPubOption[T]]
- func Messages(messages ...Message) utils.OptionFunc[pubOption]
- func NewEventPublisherDI[T eventual](name string, opts ...utils.OptionExtender) func() EventPublisher[T]
- func NewEventSubscriberDI[T eventual](name string, opts ...utils.OptionExtender) func() EventSubscriber[T]
- func Objects[T any](objectUUIDGenFunc func(T) string, objects ...any) utils.OptionFunc[pubOption]
- type Conf
- type Event
- func EventCreated[T eventual](id string, createdAt time.Time, payload T) Event[T]
- func EventDeleted[T eventual](id string, deletedAt time.Time, payload T) Event[T]
- func EventUpdated[T eventual](id string, updatedAt time.Time, payload T) Event[T]
- func NewEvent[T eventual](id string, createdAt, updatedAt, deletedAt time.Time, payload T) Event[T]
- func UntimedEvent[T eventual](id string, payload T) Event[T]
- type EventPublisher
- type EventSubscriber
- type HandlerFunc
- type IRouter
- type Message
- type Publisher
- type Subscriber
Constants ¶
View Source
const ( ErrDuplicatedSubscriberName utils.Error = "duplicated mq subscriber name" ErrDuplicatedPublisherName utils.Error = "duplicated mq publisher name" ErrDuplicatedRouterName utils.Error = "duplicated mq router name" ErrEventHandlerConflict utils.Error = "conflict with event handler and message handler" ErrNotImplement utils.Error = "mq not implement" )
Variables ¶
This section is empty.
Functions ¶
func AppName ¶
func AppName(name string) utils.OptionFunc[useOption]
func ChannelLen ¶
func ChannelLen(channelLength int) utils.OptionFunc[subOption]
func EventHandler ¶
func EventHandler[T eventual](hdr eventHandler[T]) eventHandler[T]
func EventHandlerWithMsg ¶
func EventHandlerWithMsg[T eventual](hdr eventHandlerWithMsg[T]) eventHandlerWithMsg[T]
func Events ¶
func Events[T eventual](events ...Event[T]) utils.OptionFunc[eventPubOption[T]]
func Messages ¶
func Messages(messages ...Message) utils.OptionFunc[pubOption]
func NewEventPublisherDI ¶
func NewEventPublisherDI[T eventual](name string, opts ...utils.OptionExtender) func() EventPublisher[T]
func NewEventSubscriberDI ¶
func NewEventSubscriberDI[T eventual](name string, opts ...utils.OptionExtender) func() EventSubscriber[T]
Types ¶
type Conf ¶
type Conf struct {
Topic string `yaml:"topic" json:"topic" toml:"topic"`
Type mqType `yaml:"type" json:"type" toml:"type"`
Producer bool `yaml:"producer" json:"producer" toml:"producer" default:"true"`
Consumer bool `yaml:"consumer" json:"consumer" toml:"consumer"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group" toml:"consumer_group"`
ConsumerConcurrency int `yaml:"consumer_concurrency" json:"consumer_concurrency" toml:"consumer_concurrency"`
Endpoint *endpointConf `yaml:"endpoint" json:"endpoint" toml:"endpoint"`
Persistent bool `yaml:"persistent" json:"persistent" toml:"persistent"`
SerializeType string `yaml:"serialize_type" json:"serialize_type" toml:"serialize_type"`
CompressType string `yaml:"compress_type" json:"compress_type" toml:"compress_type"`
EnableLogger bool `yaml:"enable_logger" json:"enable_logger" toml:"enable_logger" default:"false"`
Logger string `yaml:"logger" json:"logger" toml:"logger" default:"github.com/wfusion/gofusion/log/customlogger.mqLogger"`
LogInstance string `yaml:"log_instance" json:"log_instance" toml:"log_instance" default:"default"`
// mongo, mysql, mariadb option
MessageScheme string `yaml:"message_scheme" json:"message_scheme" toml:"message_scheme" default:"watermill_message"`
SeriesScheme string `yaml:"series_scheme" json:"series_scheme" toml:"series_scheme" default:"watermill_series"`
ConsumerScheme string `yaml:"consumer_scheme" json:"consumer_scheme" toml:"consumer_scheme" default:"watermill_subscriber"`
ConsumeMiddlewares []*middlewareConf `yaml:"consume_middlewares" json:"consume_middlewares" toml:"consume_middlewares"`
}
Conf mq config nolint: revive // struct tag too long issue
type Event ¶
type Event[T eventual] interface {
ID() string
Type() string
CreatedAt() time.Time
UpdatedAt() time.Time
DeletedAt() time.Time
Payload() T
Context() context.Context
Ack() bool
Nack() bool
}
func EventCreated ¶
func EventDeleted ¶
func EventUpdated ¶
func UntimedEvent ¶
type EventPublisher ¶
type EventPublisher[T eventual] interface {
// PublishEvent publishes provided messages to given topic.
//
// PublishEvent can be synchronous or asynchronous - it depends on the implementation.
//
// Most publishers implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// PublishEvent must be thread safe.
PublishEvent(ctx context.Context, opts ...utils.OptionExtender) error
}
func NewEventPublisher ¶
func NewEventPublisher[T eventual](name string, opts ...utils.OptionExtender) EventPublisher[T]
type EventSubscriber ¶
type EventSubscriber[T eventual] interface {
// SubscribeEvent returns output channel with events from provided topic.
// Channel is closed, when Close() was called on the subscriber.
//
// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
// Provided ctx is set to all produced messages.
SubscribeEvent(ctx context.Context, opts ...utils.OptionExtender) (<-chan Event[T], error)
}
func NewEventSubscriber ¶
func NewEventSubscriber[T eventual](name string, opts ...utils.OptionExtender) EventSubscriber[T]
type HandlerFunc ¶
type IRouter ¶
type IRouter interface {
Handle(handlerName string, hdr any, opts ...utils.OptionExtender)
Serve() error
Start()
Running() <-chan struct{}
// contains filtered or unexported methods
}
type Message ¶
type Message interface {
ID() string
Payload() []byte
RawMessage() any
Context() context.Context
Object() any
Ack() bool
Nack() bool
}
func NewMessage ¶
type Publisher ¶
type Publisher interface {
// Publish publishes provided messages to given topic.
//
// Publish can be synchronous or asynchronous - it depends on the implementation.
//
// Most publishers implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// Publish must be thread safe.
Publish(ctx context.Context, opts ...utils.OptionExtender) error
// PublishRaw publishes provided raw messages to given topic.
//
// PublishRaw can be synchronous or asynchronous - it depends on the implementation.
//
// Most publishers implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// PublishRaw must be thread safe.
PublishRaw(ctx context.Context, opts ...utils.OptionExtender) error
// contains filtered or unexported methods
}
type Subscriber ¶
type Subscriber interface {
// Subscribe returns output channel with messages from provided topic.
// Channel is closed, when Close() was called on the subscriber.
//
// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
// Provided ctx is set to all produced messages.
Subscribe(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error)
// SubscribeRaw returns output channel with original messages from provided topic.
// Channel is closed, when Close() was called on the subscriber.
//
// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
// Provided ctx is set to all produced messages.
SubscribeRaw(ctx context.Context, opts ...utils.OptionExtender) (<-chan Message, error)
// contains filtered or unexported methods
}
func Sub ¶
func Sub(name string, opts ...utils.OptionExtender) Subscriber
Click to show internal directories.
Click to hide internal directories.