subscriber

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: GPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubscriptionsPendingCount = "queue.subscriptions.pending.msgs"
	SubscriptionsPendingBytes = "queue.subscriptions.pending.bytes"
	SubscriptionsDroppedMsgs  = "queue.subscriptions.dropped.count"
	SubscriptionCountMsgs     = "queue.subscriptions.send.count"

	Bytes string = "By"

	Subject = attribute.Key("subject")
)
View Source
const (
	HeaderConsumerError = "Error"
)

Variables

This section is empty.

Functions

func GetConcurrentSize

func GetConcurrentSize(cfg interfaces.Config) int

func WithResponseOnError

func WithResponseOnError(logger client.Logger, handler interfaces.MsgHandler) interfaces.MsgHandler

WithResponseOnError wraps handler where if handler returns a non-nil error, the msg is then responded to with said error's string in the HeaderConsumerError header. Such message then causes nats publisher to return the error when reading response.

Note: this wrapper should NOT be used for observer-like handlers that do not send success responses. If subscribed to the same subject with an actual responder, the latter's response can potentially get snubbed.

Types

type Subscriber

type Subscriber struct {
	interfaces.Client
	*worker.Pool
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(c interfaces.Client) *Subscriber

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) ForceClose

func (s *Subscriber) ForceClose()

func (*Subscriber) Get

func (s *Subscriber) Get(subject, queue string) *Subscription

func (*Subscriber) GetSubs

func (s *Subscriber) GetSubs() *memory.SafeMap[string, *Subscription]

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(subject, queue string, handler interfaces.MsgHandler)

func (*Subscriber) SubscribeWithParameters

func (s *Subscriber) SubscribeWithParameters(
	buffer int, timeout time.Duration, subject, queue string, handler interfaces.MsgHandler,
)

func (*Subscriber) Wait

func (s *Subscriber) Wait() error

type Subscription

type Subscription struct {
	interfaces.Client

	Subscription interfaces.Subscription
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(c interfaces.Client, subject, queue string) (*Subscription, error)

func (*Subscription) Process

func (s *Subscription) Process(
	ctx context.Context,
	buffer int,
	timeout time.Duration,
	handler interfaces.MsgHandler,
) error

func (*Subscription) Stop

func (s *Subscription) Stop() error

type SubscriptionDetails

type SubscriptionDetails struct {
	Pending      int64
	PendingBytes int64
	Dropped      int64
	Delivered    int64
}

Directories

Path Synopsis
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL