Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerOption
- func WithAckWait(ackWait time.Duration) BrokerOption
- func WithBackoffDurations(durations []time.Duration) BrokerOption
- func WithConsumerNamePrefix(prefix string) BrokerOption
- func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
- func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
- func WithMaxAge(maxAge time.Duration) BrokerOption
- func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
- func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
- func WithStorageType(storage nats.StorageType) BrokerOption
- func WithStreamDescription(description string) BrokerOption
- type DirectMessaging
- type EnqueueOptions
- type MessageBroker
- type MessageQueue
- type MessageSubscription
- type NATsMessageQueueManager
- type PubSub
- type RetryConfig
- type Subscription
Constants ¶
Variables ¶
View Source
var ( ErrInvalidStreamName = errors.New("stream name cannot be empty") ErrInvalidSubjects = errors.New("subjects cannot be empty") ErrConsumerCreation = errors.New("failed to create consumer") ErrStreamCreation = errors.New("failed to create stream") ErrInvalidConfiguration = errors.New("invalid configuration") ErrConnectionClosed = errors.New("connection is closed") ErrSubscriptionClosed = errors.New("subscription is closed") )
View Source
var (
ErrPermament = errors.New("Permanent messaging error")
)
Functions ¶
This section is empty.
Types ¶
type BrokerOption ¶ added in v0.3.0
type BrokerOption func(*brokerConfiguration)
func WithAckWait ¶
func WithAckWait(ackWait time.Duration) BrokerOption
func WithBackoffDurations ¶ added in v0.3.0
func WithBackoffDurations(durations []time.Duration) BrokerOption
func WithConsumerNamePrefix ¶
func WithConsumerNamePrefix(prefix string) BrokerOption
func WithDeliverPolicy ¶ added in v0.3.0
func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
func WithDiscardPolicy ¶
func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) BrokerOption
func WithMaxDeliveryAttempts ¶
func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
func WithRetentionPolicy ¶ added in v0.3.0
func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
func WithStorageType ¶ added in v0.3.0
func WithStorageType(storage nats.StorageType) BrokerOption
func WithStreamDescription ¶ added in v0.3.0
func WithStreamDescription(description string) BrokerOption
type DirectMessaging ¶
type DirectMessaging interface {
Listen(topic string, handler func(data []byte)) (Subscription, error)
SendToOther(topic string, data []byte) error
SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
SendToSelf(topic string, data []byte) error
}
func NewNatsDirectMessaging ¶
func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging
type EnqueueOptions ¶
type EnqueueOptions struct {
IdempotententKey string
}
type MessageBroker ¶ added in v0.3.0
type MessageBroker interface {
PublishMessage(ctx context.Context, subject string, data []byte) error
CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
Close() error
}
func NewJetStreamBroker ¶ added in v0.3.0
func NewJetStreamBroker( ctx context.Context, natsConn *nats.Conn, streamName string, subjects []string, opts ...BrokerOption, ) (MessageBroker, error)
type MessageQueue ¶
type MessageSubscription ¶ added in v0.3.0
type MessageSubscription interface {
Unsubscribe() error
}
type NATsMessageQueueManager ¶
type NATsMessageQueueManager struct {
// contains filtered or unexported fields
}
func NewNATsMessageQueueManager ¶
func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager
func (*NATsMessageQueueManager) NewMessageQueue ¶
func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue
type PubSub ¶
type PubSub interface {
Publish(topic string, message []byte) error
PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}
func NewNATSPubSub ¶
type RetryConfig ¶ added in v0.3.2
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
Click to show internal directories.
Click to hide internal directories.