Documentation
¶
Index ¶
- Constants
- func GetConcurrentSize(cfg interfaces.Config) int
- func WithResponseOnError(logger client.Logger, handler interfaces.MsgHandler) interfaces.MsgHandler
- type Subscriber
- func (s *Subscriber) Close() error
- func (s *Subscriber) ForceClose()
- func (s *Subscriber) Get(subject, queue string) *Subscription
- func (s *Subscriber) GetSubs() *memory.SafeMap[string, *Subscription]
- func (s *Subscriber) Subscribe(subject, queue string, handler interfaces.MsgHandler)
- func (s *Subscriber) SubscribeWithParameters(buffer int, timeout time.Duration, subject, queue string, ...)
- func (s *Subscriber) Wait() error
- type Subscription
- type SubscriptionDetails
Constants ¶
View Source
const ( SubscriptionsPendingCount = "queue.subscriptions.pending.msgs" SubscriptionsPendingBytes = "queue.subscriptions.pending.bytes" SubscriptionsDroppedMsgs = "queue.subscriptions.dropped.count" SubscriptionCountMsgs = "queue.subscriptions.send.count" Bytes string = "By" Subject = attribute.Key("subject") )
View Source
const (
HeaderConsumerError = "Error"
)
Variables ¶
This section is empty.
Functions ¶
func GetConcurrentSize ¶
func GetConcurrentSize(cfg interfaces.Config) int
func WithResponseOnError ¶
func WithResponseOnError(logger client.Logger, handler interfaces.MsgHandler) interfaces.MsgHandler
WithResponseOnError wraps handler where if handler returns a non-nil error, the msg is then responded to with said error's string in the HeaderConsumerError header. Such message then causes nats publisher to return the error when reading response.
Note: this wrapper should NOT be used for observer-like handlers that do not send success responses. If subscribed to the same subject with an actual responder, the latter's response can potentially get snubbed.
Types ¶
type Subscriber ¶
type Subscriber struct {
interfaces.Client
*worker.Pool
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(c interfaces.Client) *Subscriber
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) ForceClose ¶
func (s *Subscriber) ForceClose()
func (*Subscriber) Get ¶
func (s *Subscriber) Get(subject, queue string) *Subscription
func (*Subscriber) GetSubs ¶
func (s *Subscriber) GetSubs() *memory.SafeMap[string, *Subscription]
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(subject, queue string, handler interfaces.MsgHandler)
func (*Subscriber) SubscribeWithParameters ¶
func (s *Subscriber) SubscribeWithParameters( buffer int, timeout time.Duration, subject, queue string, handler interfaces.MsgHandler, )
func (*Subscriber) Wait ¶
func (s *Subscriber) Wait() error
type Subscription ¶
type Subscription struct {
interfaces.Client
Subscription interfaces.Subscription
// contains filtered or unexported fields
}
func NewSubscription ¶
func NewSubscription(c interfaces.Client, subject, queue string) (*Subscription, error)
func (*Subscription) Process ¶
func (s *Subscription) Process( ctx context.Context, buffer int, timeout time.Duration, handler interfaces.MsgHandler, ) error
func (*Subscription) Stop ¶
func (s *Subscription) Stop() error
Click to show internal directories.
Click to hide internal directories.