Documentation
¶
Index ¶
- Variables
- type AcknowledgementType
- type ClientConfig
- type Consumer
- type ConsumerConfig
- type ExchangeConfig
- type Handler
- type HandlerAcknowledgement
- type Message
- type MessageArgs
- type Metric
- type NullMetric
- func (n *NullMetric) ObserveAck(success bool)
- func (n *NullMetric) ObserveMsgDelivered()
- func (n *NullMetric) ObserveMsgPublish(success bool)
- func (n *NullMetric) ObserveNack(success bool)
- func (n *NullMetric) ObserveRabbitMQChanelConnection()
- func (n *NullMetric) ObserveRabbitMQChanelConnectionFailed()
- func (n *NullMetric) ObserveRabbitMQChanelConnectionRetry()
- func (n *NullMetric) ObserveRabbitMQConnection()
- func (n *NullMetric) ObserveRabbitMQConnectionFailed()
- func (n *NullMetric) ObserveRabbitMQConnectionRetry()
- func (n *NullMetric) ObserveReject(success bool)
- type Producer
- type QueueBindConfig
- type QueueConfig
- type RabbitMQClient
- type RabbitMQClientInterface
- type RetryableConsumer
- type RetryableConsumerConfig
- type RetryableProducer
- type RetryableProducerConfig
- type Setup
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrProducerConnection = errors.New("RMQ producer has already closed the connection")
Functions ¶
This section is empty.
Types ¶
type AcknowledgementType ¶
type AcknowledgementType int
const ( Ack AcknowledgementType = iota Nack Reject )
type ClientConfig ¶
type ClientConfig struct {
// ConnectionURI is the string used to connect to rabbitmq, e.g `amqp://...`
ConnectionURI string
// Metric is an interface to collect metrics about the client and consumer
// There is NullMetric struct if you want to skip them
Metric Metric
// ConnectRetryAttempts number of attempts to try and dial the rabbitmq, and create a channel
ConnectRetryAttempts int
// InitialReconnectDelay delay between each attempt
InitialReconnectDelay time.Duration
}
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer( client RabbitMQClientInterface, handler Handler, logger logger.StructuredLogger, metric Metric, cfg ConsumerConfig, ) *Consumer
type ConsumerConfig ¶
type ConsumerConfig struct {
// PrefetchCount configures how many in-flight "deliveries" are available to the consumer to ack/nack.
// ref: https://www.rabbitmq.com/consumer-prefetch.html
// There's no default value for the reason that it's very easy to misuse RMQ and have multiple consumers
// with too many deliveries in flight which results into badly distributed work load and high memory footprint
// of the consumers.
PrefetchCount int
}
type ExchangeConfig ¶
type Handler ¶
type Handler interface {
GetQueueName() string
GetConsumerTag() string
QueueAutoAck() bool
ExclusiveConsumer() bool
MustStopOnAckError() bool
MustStopOnNAckError() bool
MustStopOnRejectError() bool
WaitToConsumeInflight() bool
ReceiveMessage(ctx context.Context, msg *Message) (acknowledgement HandlerAcknowledgement, err error)
}
type HandlerAcknowledgement ¶
type HandlerAcknowledgement struct {
Acknowledgement AcknowledgementType
Requeue bool
}
type Message ¶
type Message struct {
// The application specific payload of the message
Body []byte
// Correlation identifier
CorrelationID string
// Message headers
Headers map[string]interface{}
}
Message contains data that is specific to the consumed RabbitMQ message.
type MessageArgs ¶
type MessageArgs struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers amqp.Table
// Correlation identifier
CorrelationID string
}
MessageArgs captures the fields related to the message sent to the server.
type Metric ¶
type Metric interface {
ObserveRabbitMQConnectionFailed()
ObserveRabbitMQConnectionRetry()
ObserveRabbitMQConnection()
ObserveRabbitMQChanelConnectionFailed()
ObserveRabbitMQChanelConnectionRetry()
ObserveRabbitMQChanelConnection()
ObserveMsgDelivered()
ObserveAck(success bool)
ObserveNack(success bool)
ObserveReject(success bool)
ObserveMsgPublish(success bool)
}
type NullMetric ¶
type NullMetric struct{}
func (*NullMetric) ObserveAck ¶
func (n *NullMetric) ObserveAck(success bool)
func (*NullMetric) ObserveMsgDelivered ¶
func (n *NullMetric) ObserveMsgDelivered()
func (*NullMetric) ObserveMsgPublish ¶
func (n *NullMetric) ObserveMsgPublish(success bool)
func (*NullMetric) ObserveNack ¶
func (n *NullMetric) ObserveNack(success bool)
func (*NullMetric) ObserveRabbitMQChanelConnection ¶
func (n *NullMetric) ObserveRabbitMQChanelConnection()
func (*NullMetric) ObserveRabbitMQChanelConnectionFailed ¶
func (n *NullMetric) ObserveRabbitMQChanelConnectionFailed()
func (*NullMetric) ObserveRabbitMQChanelConnectionRetry ¶
func (n *NullMetric) ObserveRabbitMQChanelConnectionRetry()
func (*NullMetric) ObserveRabbitMQConnection ¶
func (n *NullMetric) ObserveRabbitMQConnection()
func (*NullMetric) ObserveRabbitMQConnectionFailed ¶
func (n *NullMetric) ObserveRabbitMQConnectionFailed()
func (*NullMetric) ObserveRabbitMQConnectionRetry ¶
func (n *NullMetric) ObserveRabbitMQConnectionRetry()
func (*NullMetric) ObserveReject ¶
func (n *NullMetric) ObserveReject(success bool)
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(client RabbitMQClientInterface, logger logger.StructuredLogger, metric Metric) (*Producer, error)
type QueueBindConfig ¶
type QueueConfig ¶
type RabbitMQClient ¶
type RabbitMQClient struct {
// contains filtered or unexported fields
}
A simple client that tries to connect to rabbitmq and create a channel.
Does not attempt to reconnect if the connection drops.
func (*RabbitMQClient) Close ¶
func (c *RabbitMQClient) Close() error
func (*RabbitMQClient) CreateChannel ¶
type RabbitMQClientInterface ¶
type RabbitMQClientInterface interface {
CreateChannel(ctx context.Context) (*amqp.Channel, error)
Setup(ctx context.Context, setup *Setup) error
Close() error
}
func NewRabbitMQClient ¶
func NewRabbitMQClient(ctx context.Context, cfg *ClientConfig) (RabbitMQClientInterface, error)
type RetryableConsumer ¶
type RetryableConsumer struct {
// contains filtered or unexported fields
}
func NewRetryableConsumer ¶
func NewRetryableConsumer( newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error), config RetryableConsumerConfig, logger logger.StructuredLogger, metric Metric, handler Handler, ) *RetryableConsumer
type RetryableConsumerConfig ¶
type RetryableConsumerConfig struct {
MaxRetryAttempts int
// healthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed
// for a block of code to run w/o returning an error, to consider it healthy.
// E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy
// and retried again starting from backoffConfig.Base the next time it has an error.
HealthCheckFactor int
BackoffConfig *backoff.Config
ConsumerConfig ConsumerConfig
RabbitClientConfig *ClientConfig
}
type RetryableProducer ¶
type RetryableProducer struct {
// contains filtered or unexported fields
}
func NewRetryableProducer ¶
func NewRetryableProducer( newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error), config RetryableProducerConfig, logger logger.StructuredLogger, metric Metric, ) *RetryableProducer
func (*RetryableProducer) Close ¶
func (p *RetryableProducer) Close()
func (*RetryableProducer) Publish ¶
func (p *RetryableProducer) Publish( exchange, key string, mandatory, immediate bool, expiration string, body []byte, args MessageArgs, ) error
type RetryableProducerConfig ¶
type RetryableProducerConfig struct {
MaxRetryAttempts int
// HealthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed
// for a block of code to run w/o returning an error, to consider it healthy.
// E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy
// and retried again starting from backoffConfig.Base the next time it has an error.
HealthCheckFactor int
BackoffConfig *backoff.Config
RabbitClientConfig *ClientConfig
}
type Setup ¶
type Setup struct {
Exchanges []ExchangeConfig
Queues []QueueConfig
QueueBindings []QueueBindConfig
}
Click to show internal directories.
Click to hide internal directories.