messaging

package
v0.0.0-...-c01ef8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultAckWait             = 30 * time.Second
	DefaultMaxDeliveryAttempts = 3
	DefaultConsumerPrefix      = "consumer"
	DefaultStreamMaxAge        = 3 * time.Minute
	DefaultBackoffDuration     = 30 * time.Second
)

Variables

View Source
var (
	ErrInvalidStreamName    = errors.New("stream name cannot be empty")
	ErrInvalidSubjects      = errors.New("subjects cannot be empty")
	ErrConsumerCreation     = errors.New("failed to create consumer")
	ErrStreamCreation       = errors.New("failed to create stream")
	ErrInvalidConfiguration = errors.New("invalid configuration")
	ErrConnectionClosed     = errors.New("connection is closed")
	ErrSubscriptionClosed   = errors.New("subscription is closed")
)
View Source
var (
	ErrPermament = errors.New("Permanent messaging error")
)

Functions

This section is empty.

Types

type BrokerOption

type BrokerOption func(*brokerConfiguration)

func WithAckWait

func WithAckWait(ackWait time.Duration) BrokerOption

func WithBackoffDurations

func WithBackoffDurations(durations []time.Duration) BrokerOption

func WithConsumerNamePrefix

func WithConsumerNamePrefix(prefix string) BrokerOption

func WithDescription

func WithDescription(description string) BrokerOption

Broker option functions

func WithMaxAge

func WithMaxAge(maxAge time.Duration) BrokerOption

func WithMaxDeliveryAttempts

func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption

type DirectMessaging

type DirectMessaging interface {
	Listen(topic string, handler func(data []byte)) (Subscription, error)
	SendToOther(topic string, data []byte) error
	SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
	SendToSelf(topic string, data []byte) error
}

func NewNatsDirectMessaging

func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging

func NewRabbitMQDirectMessaging

func NewRabbitMQDirectMessaging(conn *amqp.Connection) (DirectMessaging, error)

type EnqueueOptions

type EnqueueOptions struct {
	IdempotententKey string
}

type Message

type Message interface {
	Data() []byte
	Headers() map[string][]string
	Subject() string
	Ack() error
	Nak() error
	Term() error
	InProgress() error
	Metadata() (*MessageMetadata, error)
}

Message represents a generic message interface for both NATS and RabbitMQ

type MessageBroker

type MessageBroker interface {
	PublishMessage(ctx context.Context, subject string, data []byte) error
	CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg Message)) (MessageSubscription, error)
	GetStreamInfo(ctx context.Context) (*StreamInfo, error)
	FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg Message)) error
	Close() error
}

MessageBroker interface - updated to use custom Message type

func NewRabbitMQBroker

func NewRabbitMQBroker(
	ctx context.Context,
	conn *amqp.Connection,
	streamName string,
	subjects []string,
	opts ...BrokerOption,
) (MessageBroker, error)

type MessageMetadata

type MessageMetadata struct {
	Sequence     SequencePair
	NumDelivered int
	Timestamp    time.Time
}

MessageMetadata contains message delivery metadata

type MessageQueue

type MessageQueue interface {
	Enqueue(topic string, message []byte, options *EnqueueOptions) error
	Dequeue(topic string, handler func(message []byte) error) error
	Close()
}

type MessageSubscription

type MessageSubscription interface {
	Unsubscribe() error
}

type NATsMessageQueueManager

type NATsMessageQueueManager struct {
	// contains filtered or unexported fields
}

func NewNATsMessageQueueManager

func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager

func (*NATsMessageQueueManager) NewMessageQueue

func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue

type NatsBrokerOption

type NatsBrokerOption func(*natsBrokerConfiguration)

func WithDeliverPolicy

func WithDeliverPolicy(policy jetstream.DeliverPolicy) NatsBrokerOption

func WithDiscardPolicy

func WithDiscardPolicy(policy nats.DiscardPolicy) NatsBrokerOption

func WithNATsMaxAge

func WithNATsMaxAge(maxAge time.Duration) NatsBrokerOption

func WithNAtsAckWait

func WithNAtsAckWait(ackWait time.Duration) NatsBrokerOption

func WithNAtsBackoffDurations

func WithNAtsBackoffDurations(durations []time.Duration) NatsBrokerOption

func WithNAtsConsumerNamePrefix

func WithNAtsConsumerNamePrefix(prefix string) NatsBrokerOption

func WithNAtsMaxDeliveryAttempts

func WithNAtsMaxDeliveryAttempts(maxAttempts int) NatsBrokerOption

func WithRetentionPolicy

func WithRetentionPolicy(policy nats.RetentionPolicy) NatsBrokerOption

func WithStorageType

func WithStorageType(storage nats.StorageType) NatsBrokerOption

func WithStreamDescription

func WithStreamDescription(description string) NatsBrokerOption

type NatsMessageBroker

type NatsMessageBroker interface {
	PublishMessage(ctx context.Context, subject string, data []byte) error
	CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
	GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
	FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
	Close() error
}

func NewJetStreamBroker

func NewJetStreamBroker(
	ctx context.Context,
	natsConn *nats.Conn,
	streamName string,
	subjects []string,
	opts ...NatsBrokerOption,
) (NatsMessageBroker, error)

type NatsMessageSubscription

type NatsMessageSubscription interface {
	Unsubscribe() error
}

type PubSub

type PubSub interface {
	Publish(topic string, message []byte) error
	PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
	Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}

func NewNATSPubSub

func NewNATSPubSub(natsConn *nats.Conn) PubSub

func NewRabbitMQPubSub

func NewRabbitMQPubSub(conn *amqp.Connection) (PubSub, error)

type RabbitMQMessageQueueManager

type RabbitMQMessageQueueManager struct {
	// contains filtered or unexported fields
}

func NewRabbitMQMessageQueueManager

func NewRabbitMQMessageQueueManager(exchangeName string, subjectWildCards []string, conn *amqp.Connection) *RabbitMQMessageQueueManager

func (*RabbitMQMessageQueueManager) NewMessageQueue

func (m *RabbitMQMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue

type RetryConfig

type RetryConfig struct {
	RetryAttempt       uint
	ExponentialBackoff bool
	Delay              time.Duration
	OnRetry            func(n uint, err error)
}

type SequencePair

type SequencePair struct {
	Consumer uint64
	Stream   uint64
}

SequencePair represents sequence numbers

type StreamConfig

type StreamConfig struct {
	Name        string
	Subjects    []string
	Description string
}

StreamConfig represents stream configuration

type StreamInfo

type StreamInfo struct {
	Config StreamConfig
	State  StreamState
}

StreamInfo contains stream metadata

type StreamState

type StreamState struct {
	Messages  uint64
	Bytes     uint64
	FirstSeq  uint64
	LastSeq   uint64
	Consumers int
}

StreamState represents stream state

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL