messaging

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultAckWait             = 30 * time.Second
	DefaultMaxDeliveryAttempts = 3
	DefaultConsumerPrefix      = "consumer"
	DefaultStreamMaxAge        = 3 * time.Minute
	DefaultBackoffDuration     = 30 * time.Second
)

Variables

View Source
var (
	ErrInvalidStreamName    = errors.New("stream name cannot be empty")
	ErrInvalidSubjects      = errors.New("subjects cannot be empty")
	ErrConsumerCreation     = errors.New("failed to create consumer")
	ErrStreamCreation       = errors.New("failed to create stream")
	ErrInvalidConfiguration = errors.New("invalid configuration")
	ErrConnectionClosed     = errors.New("connection is closed")
	ErrSubscriptionClosed   = errors.New("subscription is closed")
)
View Source
var (
	ErrPermament = errors.New("Permanent messaging error")
)

Functions

This section is empty.

Types

type BrokerOption added in v0.3.0

type BrokerOption func(*brokerConfiguration)

func WithAckWait

func WithAckWait(ackWait time.Duration) BrokerOption

func WithBackoffDurations added in v0.3.0

func WithBackoffDurations(durations []time.Duration) BrokerOption

func WithConsumerNamePrefix

func WithConsumerNamePrefix(prefix string) BrokerOption

func WithDeliverPolicy added in v0.3.0

func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption

func WithDiscardPolicy

func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption

func WithMaxAge

func WithMaxAge(maxAge time.Duration) BrokerOption

func WithMaxDeliveryAttempts

func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption

func WithRetentionPolicy added in v0.3.0

func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption

func WithStorageType added in v0.3.0

func WithStorageType(storage nats.StorageType) BrokerOption

func WithStreamDescription added in v0.3.0

func WithStreamDescription(description string) BrokerOption

type DirectMessaging

type DirectMessaging interface {
	Listen(topic string, handler func(data []byte)) (Subscription, error)
	SendToOther(topic string, data []byte) error
	SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
	SendToSelf(topic string, data []byte) error
}

func NewNatsDirectMessaging

func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging

type EnqueueOptions

type EnqueueOptions struct {
	IdempotententKey string
}

type MessageBroker added in v0.3.0

type MessageBroker interface {
	PublishMessage(ctx context.Context, subject string, data []byte) error
	CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
	GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
	FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
	Close() error
}

func NewJetStreamBroker added in v0.3.0

func NewJetStreamBroker(
	ctx context.Context,
	natsConn *nats.Conn,
	streamName string,
	subjects []string,
	opts ...BrokerOption,
) (MessageBroker, error)

type MessageQueue

type MessageQueue interface {
	Enqueue(topic string, message []byte, options *EnqueueOptions) error
	Dequeue(topic string, handler func(message []byte) error) error
	Close()
}

type MessageSubscription added in v0.3.0

type MessageSubscription interface {
	Unsubscribe() error
}

type NATsMessageQueueManager

type NATsMessageQueueManager struct {
	// contains filtered or unexported fields
}

func NewNATsMessageQueueManager

func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager

func (*NATsMessageQueueManager) NewMessageQueue

func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue

type PubSub

type PubSub interface {
	Publish(topic string, message []byte) error
	PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
	Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}

func NewNATSPubSub

func NewNATSPubSub(natsConn *nats.Conn) PubSub

type RetryConfig added in v0.3.2

type RetryConfig struct {
	RetryAttempt       uint
	ExponentialBackoff bool
	Delay              time.Duration
	OnRetry            func(n uint, err error)
}

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL