Documentation
¶
Index ¶
- type Config
- type DefaultMarshaler
- type PubSub
- func (p *PubSub) Close() error
- func (p *PubSub) IsConnected() bool
- func (p *PubSub) Publish(topic string, messages ...*message.Message) error
- func (p *PubSub) PublishWithOpts(topic string, msg *message.Message, opts ...spi.Option) error
- func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
- func (p *PubSub) SubscribeWithOpts(ctx context.Context, topic string, opts ...spi.Option) (<-chan *message.Message, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
URI string
MaxConnectRetries int
MaxConnectionChannels int
MaxRedeliveryAttempts int
RedeliveryMultiplier float64
RedeliveryInitialInterval time.Duration
MaxRedeliveryInterval time.Duration
PublisherChannelPoolSize int
PublisherConfirmDelivery bool
}
Config holds the configuration for the publisher/subscriber.
type DefaultMarshaler ¶ added in v1.0.0
type DefaultMarshaler struct {
// PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
// for example add CorrelationId and ContentType:
//
// amqp.DefaultMarshaler{
// PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
// publishing.CorrelationId = "correlation"
// publishing.ContentType = "application/json"
//
// return publishing
// },
// }
PostprocessPublishing func(amqp.Publishing) amqp.Publishing
// When true, DeliveryMode will be not set to Persistent.
//
// DeliveryMode Transient means higher throughput, but messages will not be
// restored on broker restart. The delivery mode of publishings is unrelated
// to the durability of the queues they reside on. Transient messages will
// not be restored to durable queues, persistent messages will be restored to
// durable queues and lost on non-durable queues during server restart.
NotPersistentDeliveryMode bool
// Header used to store and read message UUID.
//
// If value is empty, defaultMessageUUIDHeaderKey value is used.
// If header doesn't exist, empty value is passed as message UUID.
MessageUUIDHeaderKey string
}
DefaultMarshaler is a modified version of the marshaller in watermill-amqp. This marshaller adds support for dead-letter queue header values and also allows a message's expiration to be set in the header.
func (DefaultMarshaler) Marshal ¶ added in v1.0.0
func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error)
Marshal marshals a message.
type PubSub ¶
PubSub implements a publisher/subscriber that connects to an AMQP-compatible message queue.
func (*PubSub) IsConnected ¶ added in v1.0.0
IsConnected return true if connected to the AMQP server.
func (*PubSub) PublishWithOpts ¶ added in v1.0.0
PublishWithOpts publishes a message to a topic using the supplied options.
func (*PubSub) Subscribe ¶
Subscribe subscribes to a topic and returns the Go channel over which messages are sent. The returned channel will be closed when Close() is called on this struct.
func (*PubSub) SubscribeWithOpts ¶
func (p *PubSub) SubscribeWithOpts(ctx context.Context, topic string, opts ...spi.Option) (<-chan *message.Message, error)
SubscribeWithOpts subscribes to a topic using the given options, and returns the Go channel over which messages are sent. The returned channel will be closed when Close() is called on this struct.