subscriber

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubscriptionsPendingCount = "nats.subscriptions.pending.msgs"
	SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes"
	SubscriptionsDroppedMsgs  = "nats.subscriptions.dropped.count"
	SubscriptionCountMsgs     = "nats.subscriptions.send.count"

	Bytes string = "By"

	Subject = attribute.Key("subject")
)
View Source
const (
	HeaderConsumerError = "Error"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Conn() *nats.Conn
	Context() context.Context
	Logger() client.Logger
	Config() *client.Config
	QueueSubscribeSync(subject, queue string) (*nats.Subscription, error)
	Meter() metric.Meter
	WithMeter(metric.Meter)
}

type MsgHandler

type MsgHandler func(ctx context.Context, msg *nats.Msg) error

func WithResponseOnError

func WithResponseOnError(logger client.Logger, handler MsgHandler) MsgHandler

WithResponseOnError wraps handler where if handler returns a non-nil error, the msg is then responded to with said error's string in the HeaderConsumerError header. Such message then causes nats publisher to return the error when reading response.

Note: this wrapper should NOT be used for observer-like handlers that do not send success responses. If subscribed to the same subject with an actual responder, the latter's response can potentially get snubbed.

type Subscriber

type Subscriber struct {
	Client
	*worker.Pool
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(c Client) *Subscriber

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) ForceClose

func (s *Subscriber) ForceClose()

func (*Subscriber) Get

func (s *Subscriber) Get(subject, queue string) *Subscription

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(subject, queue string, handler MsgHandler)

func (*Subscriber) SubscribeWithParameters

func (s *Subscriber) SubscribeWithParameters(
	buffer int, timeout time.Duration, subject, queue string, handler MsgHandler,
)

func (*Subscriber) Wait

func (s *Subscriber) Wait() error

type Subscription

type Subscription struct {
	Client
	*nats.Subscription
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(c Client, subject, queue string) (*Subscription, error)

func (*Subscription) Process

func (s *Subscription) Process(
	ctx context.Context,
	buffer int,
	timeout time.Duration,
	handler MsgHandler,
) error

func (*Subscription) Stop

func (s *Subscription) Stop() error

type SubscriptionDetails

type SubscriptionDetails struct {
	Pending      int64
	PendingBytes int64
	Dropped      int64
	Delivered    int64
}

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL