Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerOption
- func WithAckWait(ackWait time.Duration) BrokerOption
- func WithBackoffDurations(durations []time.Duration) BrokerOption
- func WithConsumerNamePrefix(prefix string) BrokerOption
- func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
- func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
- func WithMaxAckPending(maxAckPending int) BrokerOption
- func WithMaxAge(maxAge time.Duration) BrokerOption
- func WithMaxBytes(maxBytes int64) BrokerOption
- func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
- func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
- func WithStorageType(storage nats.StorageType) BrokerOption
- func WithStreamDescription(description string) BrokerOption
- type DirectMessaging
- type EnqueueOptions
- type MessageBroker
- type MessageQueue
- type MessageSubscription
- type NATsMessageQueueManager
- type PubSub
- type RetryConfig
- type Subscription
Constants ¶
Variables ¶
View Source
var ( ErrInvalidStreamName = errors.New("stream name cannot be empty") ErrInvalidSubjects = errors.New("subjects cannot be empty") ErrConsumerCreation = errors.New("failed to create consumer") ErrStreamCreation = errors.New("failed to create stream") ErrInvalidConfiguration = errors.New("invalid configuration") ErrConnectionClosed = errors.New("connection is closed") ErrSubscriptionClosed = errors.New("subscription is closed") )
View Source
var (
ErrPermament = errors.New("Permanent messaging error")
)
Functions ¶
This section is empty.
Types ¶
type BrokerOption ¶
type BrokerOption func(*brokerConfiguration)
func WithAckWait ¶
func WithAckWait(ackWait time.Duration) BrokerOption
func WithBackoffDurations ¶
func WithBackoffDurations(durations []time.Duration) BrokerOption
func WithConsumerNamePrefix ¶
func WithConsumerNamePrefix(prefix string) BrokerOption
func WithDeliverPolicy ¶
func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
func WithDiscardPolicy ¶
func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
func WithMaxAckPending ¶
func WithMaxAckPending(maxAckPending int) BrokerOption
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) BrokerOption
func WithMaxBytes ¶
func WithMaxBytes(maxBytes int64) BrokerOption
WithMaxAckPending sets the maximum number of unACKed messages the JetStream server will deliver to this consumer at once. Once the limit is reached, JetStream holds back new messages until existing ones are ACKed or NAKed. This provides natural backpressure: messages stay durably in the stream instead of being pushed to consumers that have no capacity to process them. Set this to match the desired processing concurrency (e.g. 2 for keygen, 20 for signing).
func WithMaxDeliveryAttempts ¶
func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
func WithRetentionPolicy ¶
func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
func WithStorageType ¶
func WithStorageType(storage nats.StorageType) BrokerOption
func WithStreamDescription ¶
func WithStreamDescription(description string) BrokerOption
type DirectMessaging ¶
type DirectMessaging interface {
Listen(topic string, handler func(data []byte)) (Subscription, error)
SendToOther(topic string, data []byte) error
SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
SendToSelf(topic string, data []byte) error
}
func NewNatsDirectMessaging ¶
func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging
type EnqueueOptions ¶
type EnqueueOptions struct {
IdempotententKey string
}
type MessageBroker ¶
type MessageBroker interface {
PublishMessage(ctx context.Context, subject string, data []byte) error
CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
Close() error
}
func NewJetStreamBroker ¶
func NewJetStreamBroker( ctx context.Context, natsConn *nats.Conn, streamName string, subjects []string, opts ...BrokerOption, ) (MessageBroker, error)
type MessageQueue ¶
type MessageSubscription ¶
type MessageSubscription interface {
Unsubscribe() error
}
type NATsMessageQueueManager ¶
type NATsMessageQueueManager struct {
// contains filtered or unexported fields
}
func NewNATsMessageQueueManager ¶
func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager
func (*NATsMessageQueueManager) NewMessageQueue ¶
func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue
type PubSub ¶
type PubSub interface {
Publish(topic string, message []byte) error
PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}
func NewNATSPubSub ¶
type RetryConfig ¶
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
Click to show internal directories.
Click to hide internal directories.