Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GobMarshaler ¶
type GobMarshaler struct{}
GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type JSONMarshaler ¶ added in v0.0.4
type JSONMarshaler struct{}
JSONMarshaler uses encoding/json to marshal Watermill messages.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type NATSMarshaler ¶ added in v0.0.4
type NATSMarshaler struct{}
NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats. The watermill UUID is stored at _watermill_message_uuid
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides the jetstream implementation for watermill publish operations
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new Publisher.
func NewPublisherWithNatsConn ¶
func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.
type PublisherConfig ¶
type PublisherConfig struct {
// URL is the NATS URL.
URL string
// NatsOptions are custom options for a connection.
NatsOptions []nats.Option
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// PublishOptions are custom publish option to be used on all publication
PublishOptions []nats.PubOpt
// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
TrackMsgId bool
}
PublisherConfig is the configuration to create a publisher
func (PublisherConfig) GetPublisherPublishConfig ¶
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig
GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (PublisherConfig) Validate ¶
func (c PublisherConfig) Validate() error
Validate ensures configuration is valid before use
type PublisherPublishConfig ¶
type PublisherPublishConfig struct {
// Marshaler is marshaler used to marshal messages between watermill and wire formats
Marshaler Marshaler
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// PublishOptions are custom publish option to be used on all publication
PublishOptions []nats.PubOpt
// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication
TrackMsgId bool
}
PublisherPublishConfig is the configuration subset needed for an individual publish call
type SubjectCalculator ¶ added in v0.0.2
SubjectCalculator is a function used to calculate nats subject(s) for the given topic.
type Subjects ¶ added in v0.0.3
Subjects contains nats subject detail (primary + all additional) for a given watermill topic.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides the jetstream implementation for watermill subscribe operations
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates a new Subscriber.
func NewSubscriberWithNatsConn ¶
func NewSubscriberWithNatsConn(conn *nats.Conn, config SubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriberWithNatsConn creates a new Subscriber with the provided nats connection.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe
type SubscriberConfig ¶
type SubscriberConfig struct {
// URL is the URL to the broker
URL string
// QueueGroup is the JetStream queue group.
//
// All subscriptions with the same queue name (regardless of the connection they originate from)
// will form a queue group. Each message will be delivered to only one subscriber per queue group,
// using queuing semantics.
//
// It is recommended to set it with DurableName.
// For non durable queue subscribers, when the last member leaves the group,
// that group is removed. A durable queue group (DurableName) allows you to have all members leave
// but still maintain state. When a member re-joins, it starts at the last position in that group.
//
// When QueueGroup is empty, subscribe without QueueGroup will be used.
QueueGroup string
// DurableName is the JetStream durable name.
//
// Subscriptions may also specify a “durable name” which will survive client restarts.
// Durable subscriptions cause the server to track the last acknowledged message
// sequence number for a client and durable name. When the client restarts/resubscribes,
// and uses the same client ID and durable name, the server will resume delivery beginning
// with the earliest unacknowledged message for this durable subscription.
//
// Doing this causes the JetStream server to track
// the last acknowledged message for that ClientID + DurableName.
DurableName string
// SubscribersCount determines how many concurrent subscribers should be started.
SubscribersCount int
// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
CloseTimeout time.Duration
// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
// It is mapped to stan.AckWait option.
AckWaitTimeout time.Duration
// SubscribeTimeout determines how long subscriber will wait for a successful subscription
SubscribeTimeout time.Duration
// NatsOptions are custom []nats.Option passed to the connection.
// It is also used to provide connection parameters, for example:
// nats.URL("nats://localhost:4222")
NatsOptions []nats.Option
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
Unmarshaler Unmarshaler
// SubscribeOptions defines nats options to be used when subscribing
SubscribeOptions []nats.SubOpt
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// AckSync enables synchronous acknowledgement (needed for exactly once processing)
AckSync bool
}
SubscriberConfig is the configuration to create a subscriber
func (*SubscriberConfig) GetSubscriberSubscriptionConfig ¶ added in v0.0.2
func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig
GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established
type SubscriberSubscriptionConfig ¶
type SubscriberSubscriptionConfig struct {
// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
Unmarshaler Unmarshaler
// QueueGroup is the JetStream queue group.
//
// All subscriptions with the same queue name (regardless of the connection they originate from)
// will form a queue group. Each message will be delivered to only one subscriber per queue group,
// using queuing semantics.
//
// It is recommended to set it with DurableName.
// For non durable queue subscribers, when the last member leaves the group,
// that group is removed. A durable queue group (DurableName) allows you to have all members leave
// but still maintain state. When a member re-joins, it starts at the last position in that group.
//
// When QueueGroup is empty, subscribe without QueueGroup will be used.
QueueGroup string
// DurableName is the JetStream durable name.
//
// Subscriptions may also specify a “durable name” which will survive client restarts.
// Durable subscriptions cause the server to track the last acknowledged message
// sequence number for a client and durable name. When the client restarts/resubscribes,
// and uses the same client ID and durable name, the server will resume delivery beginning
// with the earliest unacknowledged message for this durable subscription.
//
// Doing this causes the JetStream server to track
// the last acknowledged message for that ClientID + DurableName.
DurableName string
// SubscribersCount determines wow much concurrent subscribers should be started.
SubscribersCount int
// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
// It is mapped to stan.AckWait option.
AckWaitTimeout time.Duration
// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
CloseTimeout time.Duration
// SubscribeTimeout determines how long subscriber will wait for a successful subscription
SubscribeTimeout time.Duration
// JetstreamOptions are custom Jetstream options for a connection.
JetstreamOptions []nats.JSOpt
// SubscribeOptions defines nats options to be used when subscribing
SubscribeOptions []nats.SubOpt
// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*")
SubjectCalculator SubjectCalculator
// AutoProvision bypasses client validation and provisioning of streams
AutoProvision bool
// AckSync enables synchronous acknowledgement (needed for exactly once processing)
AckSync bool
}
SubscriberSubscriptionConfig is the configurationz
func (*SubscriberSubscriptionConfig) Validate ¶
func (c *SubscriberSubscriptionConfig) Validate() error
Validate ensures configuration is valid before use