messaging

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultAckWait             = 30 * time.Second
	DefaultMaxDeliveryAttempts = 3
	DefaultConsumerPrefix      = "consumer"
	DefaultStreamMaxAge        = 7 * 24 * time.Hour
	DefaultStreamMaxBytes      = int64(100_000_000)
	DefaultBackoffDuration     = 30 * time.Second
)

Variables

View Source
var (
	ErrInvalidStreamName    = errors.New("stream name cannot be empty")
	ErrInvalidSubjects      = errors.New("subjects cannot be empty")
	ErrConsumerCreation     = errors.New("failed to create consumer")
	ErrStreamCreation       = errors.New("failed to create stream")
	ErrInvalidConfiguration = errors.New("invalid configuration")
	ErrConnectionClosed     = errors.New("connection is closed")
	ErrSubscriptionClosed   = errors.New("subscription is closed")
)
View Source
var (
	ErrPermament = errors.New("Permanent messaging error")
)

Functions

This section is empty.

Types

type BrokerOption

type BrokerOption func(*brokerConfiguration)

func WithAckWait

func WithAckWait(ackWait time.Duration) BrokerOption

func WithBackoffDurations

func WithBackoffDurations(durations []time.Duration) BrokerOption

func WithConsumerNamePrefix

func WithConsumerNamePrefix(prefix string) BrokerOption

func WithDeliverPolicy

func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption

func WithDiscardPolicy

func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption

func WithMaxAckPending

func WithMaxAckPending(maxAckPending int) BrokerOption

func WithMaxAge

func WithMaxAge(maxAge time.Duration) BrokerOption

func WithMaxBytes

func WithMaxBytes(maxBytes int64) BrokerOption

WithMaxAckPending sets the maximum number of unACKed messages the JetStream server will deliver to this consumer at once. Once the limit is reached, JetStream holds back new messages until existing ones are ACKed or NAKed. This provides natural backpressure: messages stay durably in the stream instead of being pushed to consumers that have no capacity to process them. Set this to match the desired processing concurrency (e.g. 2 for keygen, 20 for signing).

func WithMaxDeliveryAttempts

func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption

func WithRetentionPolicy

func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption

func WithStorageType

func WithStorageType(storage nats.StorageType) BrokerOption

func WithStreamDescription

func WithStreamDescription(description string) BrokerOption

type DirectMessaging

type DirectMessaging interface {
	Listen(topic string, handler func(data []byte)) (Subscription, error)
	SendToOther(topic string, data []byte) error
	SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
	SendToSelf(topic string, data []byte) error
}

func NewNatsDirectMessaging

func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging

type EnqueueOptions

type EnqueueOptions struct {
	IdempotententKey string
}

type MessageBroker

type MessageBroker interface {
	PublishMessage(ctx context.Context, subject string, data []byte) error
	CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
	GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
	FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
	Close() error
}

func NewJetStreamBroker

func NewJetStreamBroker(
	ctx context.Context,
	natsConn *nats.Conn,
	streamName string,
	subjects []string,
	opts ...BrokerOption,
) (MessageBroker, error)

type MessageQueue

type MessageQueue interface {
	Enqueue(topic string, message []byte, options *EnqueueOptions) error
	Dequeue(topic string, handler func(message []byte) error) error
	Close()
}

type MessageSubscription

type MessageSubscription interface {
	Unsubscribe() error
}

type NATsMessageQueueManager

type NATsMessageQueueManager struct {
	// contains filtered or unexported fields
}

func NewNATsMessageQueueManager

func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager

func (*NATsMessageQueueManager) NewMessageQueue

func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue

type PubSub

type PubSub interface {
	Publish(topic string, message []byte) error
	PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
	Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}

func NewNATSPubSub

func NewNATSPubSub(natsConn *nats.Conn) PubSub

type RetryConfig

type RetryConfig struct {
	RetryAttempt       uint
	ExponentialBackoff bool
	Delay              time.Duration
	OnRetry            func(n uint, err error)
}

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL