Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerOption
- func WithAckWait(ackWait time.Duration) BrokerOption
- func WithBackoffDurations(durations []time.Duration) BrokerOption
- func WithConsumerNamePrefix(prefix string) BrokerOption
- func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
- func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
- func WithMaxAckPending(maxAckPending int) BrokerOption
- func WithMaxAge(maxAge time.Duration) BrokerOption
- func WithMaxBytes(maxBytes int64) BrokerOption
- func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
- func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
- func WithStorageType(storage nats.StorageType) BrokerOption
- func WithStreamDescription(description string) BrokerOption
- type DirectMessaging
- type EnqueueOptions
- type MessageBroker
- type MessageQueue
- type MessageSubscription
- type NATsMessageQueueManager
- type PubSub
- type RetryConfig
- type Subscription
Constants ¶
Variables ¶
View Source
var ( ErrInvalidStreamName = errors.New("stream name cannot be empty") ErrInvalidSubjects = errors.New("subjects cannot be empty") ErrConsumerCreation = errors.New("failed to create consumer") ErrStreamCreation = errors.New("failed to create stream") ErrInvalidConfiguration = errors.New("invalid configuration") ErrConnectionClosed = errors.New("connection is closed") ErrSubscriptionClosed = errors.New("subscription is closed") )
View Source
var (
ErrPermament = errors.New("Permanent messaging error")
)
Functions ¶
This section is empty.
Types ¶
type BrokerOption ¶ added in v0.3.0
type BrokerOption func(*brokerConfiguration)
func WithAckWait ¶
func WithAckWait(ackWait time.Duration) BrokerOption
func WithBackoffDurations ¶ added in v0.3.0
func WithBackoffDurations(durations []time.Duration) BrokerOption
func WithConsumerNamePrefix ¶
func WithConsumerNamePrefix(prefix string) BrokerOption
func WithDeliverPolicy ¶ added in v0.3.0
func WithDeliverPolicy(policy jetstream.DeliverPolicy) BrokerOption
func WithDiscardPolicy ¶
func WithDiscardPolicy(policy nats.DiscardPolicy) BrokerOption
func WithMaxAckPending ¶ added in v0.3.5
func WithMaxAckPending(maxAckPending int) BrokerOption
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) BrokerOption
func WithMaxBytes ¶ added in v0.3.5
func WithMaxBytes(maxBytes int64) BrokerOption
WithMaxAckPending sets the maximum number of unACKed messages the JetStream server will deliver to this consumer at once. Once the limit is reached, JetStream holds back new messages until existing ones are ACKed or NAKed. This provides natural backpressure: messages stay durably in the stream instead of being pushed to consumers that have no capacity to process them. Set this to match the desired processing concurrency (e.g. 2 for keygen, 20 for signing).
func WithMaxDeliveryAttempts ¶
func WithMaxDeliveryAttempts(maxAttempts int) BrokerOption
func WithRetentionPolicy ¶ added in v0.3.0
func WithRetentionPolicy(policy nats.RetentionPolicy) BrokerOption
func WithStorageType ¶ added in v0.3.0
func WithStorageType(storage nats.StorageType) BrokerOption
func WithStreamDescription ¶ added in v0.3.0
func WithStreamDescription(description string) BrokerOption
type DirectMessaging ¶
type DirectMessaging interface {
Listen(topic string, handler func(data []byte)) (Subscription, error)
SendToOther(topic string, data []byte) error
SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
SendToSelf(topic string, data []byte) error
}
func NewNatsDirectMessaging ¶
func NewNatsDirectMessaging(natsConn *nats.Conn) DirectMessaging
type EnqueueOptions ¶
type EnqueueOptions struct {
IdempotententKey string
}
type MessageBroker ¶ added in v0.3.0
type MessageBroker interface {
PublishMessage(ctx context.Context, subject string, data []byte) error
CreateSubscription(ctx context.Context, consumerName, subject string, handler func(msg jetstream.Msg)) (MessageSubscription, error)
GetStreamInfo(ctx context.Context) (*jetstream.StreamInfo, error)
FetchMessages(ctx context.Context, consumerName, subject string, batchSize int, handler func(msg jetstream.Msg)) error
Close() error
}
func NewJetStreamBroker ¶ added in v0.3.0
func NewJetStreamBroker( ctx context.Context, natsConn *nats.Conn, streamName string, subjects []string, opts ...BrokerOption, ) (MessageBroker, error)
type MessageQueue ¶
type MessageSubscription ¶ added in v0.3.0
type MessageSubscription interface {
Unsubscribe() error
}
type NATsMessageQueueManager ¶
type NATsMessageQueueManager struct {
// contains filtered or unexported fields
}
func NewNATsMessageQueueManager ¶
func NewNATsMessageQueueManager(queueName string, subjectWildCards []string, nc *nats.Conn) *NATsMessageQueueManager
func (*NATsMessageQueueManager) NewMessageQueue ¶
func (m *NATsMessageQueueManager) NewMessageQueue(consumerName string) MessageQueue
type PubSub ¶
type PubSub interface {
Publish(topic string, message []byte) error
PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
}
func NewNATSPubSub ¶
type RetryConfig ¶ added in v0.3.2
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
Click to show internal directories.
Click to hide internal directories.