Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerOption
- func WithAckWait(ackWait time.Duration) BrokerOption
- func WithBackoffDurations(durations []time.Duration) BrokerOption
- func WithConsumerNamePrefix(prefix string) BrokerOption
- func WithDescription(description string) BrokerOption
- func WithMaxAge(maxAge time.Duration) BrokerOption
- func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
- type DirectMessaging
- type EnqueueOptions
- type Message
- type MessageBroker
- type MessageMetadata
- type MessageQueue
- type MessageSubscription
- type NATsMessageQueueManager
- type NatsBrokerOption
- func WithDeliverPolicy(policy jetstream.DeliverPolicy) NatsBrokerOption
- func WithDiscardPolicy(policy nats.DiscardPolicy) NatsBrokerOption
- func WithNATsMaxAge(maxAge time.Duration) NatsBrokerOption
- func WithNAtsAckWait(ackWait time.Duration) NatsBrokerOption
- func WithNAtsBackoffDurations(durations []time.Duration) NatsBrokerOption
- func WithNAtsConsumerNamePrefix(prefix string) NatsBrokerOption
- func WithNAtsMaxDeliveryAttempts(maxAttempts int) NatsBrokerOption
- func WithRetentionPolicy(policy nats.RetentionPolicy) NatsBrokerOption
- func WithStorageType(storage nats.StorageType) NatsBrokerOption
- func WithStreamDescription(description string) NatsBrokerOption
- type NatsMessageBroker
- type NatsMessageSubscription
- type PubSub
- type RabbitMQMessageQueueManager
- type RetryConfig
- type SequencePair
- type StreamConfig
- type StreamInfo
- type StreamState
- type Subscription
Constants ¶
Variables ¶
View Source
var ( ErrInvalidStreamName = errors.New("stream name cannot be empty") ErrInvalidSubjects = errors.New("subjects cannot be empty") ErrConsumerCreation = errors.New("failed to create consumer") ErrStreamCreation = errors.New("failed to create stream") ErrInvalidConfiguration = errors.New("invalid configuration") ErrConnectionClosed = errors.New("connection is closed") ErrSubscriptionClosed = errors.New("subscription is closed") )
View Source
var (
ErrPermament = errors.New("Permanent messaging error")
)
Functions ¶
This section is empty.
Types ¶
type BrokerOption ¶
type BrokerOption func(*brokerConfiguration)
func WithAckWait ¶
func WithAckWait(ackWait time.Duration) BrokerOption
func WithBackoffDurations ¶
func WithBackoffDurations(durations []time.Duration) BrokerOption
func WithConsumerNamePrefix ¶
func WithConsumerNamePrefix(prefix string) BrokerOption
func WithDescription ¶
func WithDescription(description string) BrokerOption
Broker option functions
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) BrokerOption
func WithMaxDeliveryAttempts ¶
func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
type DirectMessaging ¶
type DirectMessaging interface {
Listen(topic string, handler func(data []byte)) (Subscription, error)
SendToOther(topic string, data []byte) error
SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
SendToSelf(topic string, data []byte) error
}
func NewNatsDirectMessaging ¶
func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging
func NewRabbitMQDirectMessaging ¶
func NewRabbitMQDirectMessaging(conn *amqp.Connection) (DirectMessaging, error)
type EnqueueOptions ¶
type EnqueueOptions struct {
IdempotententKey string
}
type Message ¶
type Message interface {
Data() []byte
Headers() map[string][]string
Subject() string
Ack() error
Nak() error
Term() error
InProgress() error
Metadata() (*MessageMetadata, error)
}
Message represents a generic message interface for both NATS and RabbitMQ
type MessageBroker ¶
type MessageBroker interface {
PublishMessage(ctx context.Context, subject string, data []byte) error
CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg Message)) (MessageSubscription, error)
GetStreamInfo(ctx context.Context) (*StreamInfo, error)
FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg Message)) error
Close() error
}
MessageBroker interface - updated to use custom Message type
func NewRabbitMQBroker ¶
func NewRabbitMQBroker( ctx context.Context, conn *amqp.Connection, streamName string, subjects []string, opts ...BrokerOption, ) (MessageBroker, error)
type MessageMetadata ¶
type MessageMetadata struct {
Sequence SequencePair
NumDelivered int
Timestamp time.Time
}
MessageMetadata contains message delivery metadata
type MessageQueue ¶
type MessageSubscription ¶
type MessageSubscription interface {
Unsubscribe() error
}
type NATsMessageQueueManager ¶
type NATsMessageQueueManager struct {
// contains filtered or unexported fields
}
func NewNATsMessageQueueManager ¶
func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager
func (*NATsMessageQueueManager) NewMessageQueue ¶
func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue
type NatsBrokerOption ¶
type NatsBrokerOption func(*natsBrokerConfiguration)
func WithDeliverPolicy ¶
func WithDeliverPolicy(policy jetstream.DeliverPolicy) NatsBrokerOption
func WithDiscardPolicy ¶
func WithDiscardPolicy(policy nats.DiscardPolicy) NatsBrokerOption
func WithNATsMaxAge ¶
func WithNATsMaxAge(maxAge time.Duration) NatsBrokerOption
func WithNAtsAckWait ¶
func WithNAtsAckWait(ackWait time.Duration) NatsBrokerOption
func WithNAtsBackoffDurations ¶
func WithNAtsBackoffDurations(durations []time.Duration) NatsBrokerOption
func WithNAtsConsumerNamePrefix ¶
func WithNAtsConsumerNamePrefix(prefix string) NatsBrokerOption
func WithNAtsMaxDeliveryAttempts ¶
func WithNAtsMaxDeliveryAttempts(maxAttempts int) NatsBrokerOption
func WithRetentionPolicy ¶
func WithRetentionPolicy(policy nats.RetentionPolicy) NatsBrokerOption
func WithStorageType ¶
func WithStorageType(storage nats.StorageType) NatsBrokerOption
func WithStreamDescription ¶
func WithStreamDescription(description string) NatsBrokerOption
type NatsMessageBroker ¶
type NatsMessageBroker interface {
PublishMessage(ctx context.Context, subject string, data []byte) error
CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
Close() error
}
func NewJetStreamBroker ¶
func NewJetStreamBroker( ctx context.Context, natsConn *nats.Conn, streamName string, subjects []string, opts ...NatsBrokerOption, ) (NatsMessageBroker, error)
type NatsMessageSubscription ¶
type NatsMessageSubscription interface {
Unsubscribe() error
}
type PubSub ¶
type PubSub interface {
Publish(topic string, message []byte) error
PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}
func NewNATSPubSub ¶
func NewRabbitMQPubSub ¶
func NewRabbitMQPubSub(conn *amqp.Connection) (PubSub, error)
type RabbitMQMessageQueueManager ¶
type RabbitMQMessageQueueManager struct {
// contains filtered or unexported fields
}
func NewRabbitMQMessageQueueManager ¶
func NewRabbitMQMessageQueueManager(exchangeName string, subjectWildCards []string, conn *amqp.Connection) *RabbitMQMessageQueueManager
func (*RabbitMQMessageQueueManager) NewMessageQueue ¶
func (m *RabbitMQMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue
type RetryConfig ¶
type SequencePair ¶
SequencePair represents sequence numbers
type StreamConfig ¶
StreamConfig represents stream configuration
type StreamInfo ¶
type StreamInfo struct {
Config StreamConfig
State StreamState
}
StreamInfo contains stream metadata
type StreamState ¶
type StreamState struct {
Messages uint64
Bytes uint64
FirstSeq uint64
LastSeq uint64
Consumers int
}
StreamState represents stream state
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
Click to show internal directories.
Click to hide internal directories.