Documentation
¶
Index ¶
- Constants
- Variables
- func NewError(errorMessage string, errorCode grpc.ErrorCode) *grpc.Error
- func WithConcurrentReplyConsumer(number int) func(options *Options)
- func WithLogger(logger rabbitmq.Logger) func(options *Options)
- func WithMultiplePublishers(number int) func(options *Options)
- func WithPublisherHeader(header []HeaderValue) func(options *PublisherOptions)
- func WithPublisherTracing(tracing bool) func(options *PublisherOptions)
- type Configuration
- type ConsumerFactory
- type ConsumerOpt
- type ConsumerOpts
- type Exchange
- type HandlerFunc
- type Header
- type HeaderKey
- type HeaderReplyType
- type HeaderValue
- type Options
- type PublishOpt
- type PublishRequest
- type Publisher
- type PublisherOptions
- type PublisherPool
- func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) error
- func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) ([]byte, error)
- func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, ...) (chan ReplyResponse, error)
- func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, ...) error
- func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, ...) error
- type Rabbit
- type ReplyPool
- type ReplyRequest
- type ReplyResponse
- type Topic
- type TopicBuilder
- type TopicWord
- type TraceCarrier
Constants ¶
const ( TopicExchange exchangeType = "topic" DirectExchange exchangeType = "direct" FanoutExchange exchangeType = "fanout" )
Variables ¶
var ErrResponse = errors.New("responded with an error")
Functions ¶
func WithConcurrentReplyConsumer ¶
WithConcurrentReplyConsumer sets the number of go routines that will handle the reply queue
func WithLogger ¶
func WithMultiplePublishers ¶
WithMultiplePublishers sets the number of routines running a publisher, each has it's own TCP connection
func WithPublisherHeader ¶ added in v0.2.4
func WithPublisherHeader(header []HeaderValue) func(options *PublisherOptions)
func WithPublisherTracing ¶ added in v0.2.4
func WithPublisherTracing(tracing bool) func(options *PublisherOptions)
Types ¶
type Configuration ¶
type Configuration struct {
// URL is the address for connecting to the RabbitMQ instance
URL string `json:"address" yaml:"address" validate:"required"`
}
Configuration AMQP basic configuration for the message bus
type ConsumerFactory ¶ added in v0.2.4
type ConsumerFactory struct {
// contains filtered or unexported fields
}
func NewConsumerFactory ¶ added in v0.2.4
func NewConsumerFactory(connection *rabbitmq.Conn, exchange Exchange, metrics rabbitMetrics, obs observability.Observability, opts ...ConsumerOpt) ConsumerFactory
NewConsumerFactory creates a new consumer factory for the given exchange Opts will be applied to every consumer created by this factory and can be overridden by the consumer itself
func (*ConsumerFactory) NewConsumer ¶ added in v0.2.4
func (cm *ConsumerFactory) NewConsumer(exchange Exchange, topic Topic, queueName string, handler HandlerFunc, durable bool, opts ...ConsumerOpt) (*rabbitmq.Consumer, error)
NewConsumer creates a new consumer for given exchange, topic and handler function, the returned consumer should only be used for disconnecting
type ConsumerOpt ¶ added in v0.2.4
type ConsumerOpt func(*ConsumerOpts)
func WithDefaultTimeout ¶ added in v0.2.4
func WithDefaultTimeout(timeout time.Duration) ConsumerOpt
func WithRoutines ¶ added in v0.2.4
func WithRoutines(routines int) ConsumerOpt
type ConsumerOpts ¶ added in v0.2.4
type ConsumerOpts struct {
// contains filtered or unexported fields
}
type HandlerFunc ¶
type Header ¶
type Header struct {
// contains filtered or unexported fields
}
func (*Header) Build ¶
func (rh *Header) Build() []HeaderValue
func (*Header) WithField ¶
When adding custom fields, make sure the value is supported: https://pkg.go.dev/github.com/wagslane/go-rabbitmq#Table
func (*Header) WithMethod ¶
type HeaderReplyType ¶
type HeaderReplyType string
const ( HeaderReplyTypeAcknowledge HeaderReplyType = "acknowledge" HeaderReplyTypeInterceptedMessage HeaderReplyType = "intercepted_message" )
type HeaderValue ¶
type PublishOpt ¶ added in v0.2.4
type PublishOpt func(*PublisherOptions)
type PublishRequest ¶
type PublisherOptions ¶
type PublisherOptions struct {
// contains filtered or unexported fields
}
type PublisherPool ¶
type PublisherPool struct {
// contains filtered or unexported fields
}
func (*PublisherPool) Publish ¶
func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) error
Publish publishes a rabbit message Returns an error, only initialize it if needed, error already logged
func (*PublisherPool) PublishRPC ¶
func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) ([]byte, error)
PublishRPC publishes a RPC message and waits for the reply
func (*PublisherPool) PublishRPCWithMultipleResponses ¶
func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, options ...PublishOpt) (chan ReplyResponse, error)
func (*PublisherPool) Respond ¶
func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, options ...PublishOpt) error
Respond publishes a response to a rabbit message Set isError to true if the reply is an error, otherwise pass false to indicate valid response Returns an error, only initialize it if necessary
func (*PublisherPool) RespondWithError ¶
func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, options ...PublishOpt) error
type Rabbit ¶
type Rabbit struct {
ConsumerFactory ConsumerFactory
Publisher PublisherPool
Exchange Exchange
// contains filtered or unexported fields
}
func New ¶
func New(configuration Configuration, serviceExchange Exchange, obs observability.Observability, opts ...func(*Options)) (*Rabbit, error)
New creates and returns a new rabbit client with given configuration
func (*Rabbit) Disconnect ¶
Disconnect disconnects all rabbit connections
type ReplyPool ¶
type ReplyPool struct {
Request chan ReplyRequest
Response chan ReplyResponse
Cancel chan string
Clients map[string]*client
}
func NewReplyPool ¶
NewReplyPool creates and returns a new ReplyPool
type ReplyRequest ¶
type ReplyRequest struct {
CorrelationId string
RequestChan chan ReplyResponse
// ExpectedResponsesNr represents the number of responses
// expected for this client
ExpectedResponsesNr int
}
type ReplyResponse ¶
type TopicBuilder ¶
type TopicBuilder struct {
// contains filtered or unexported fields
}
func NewTopic ¶
func NewTopic(service Exchange) *TopicBuilder
NewTopic creates a new topic that starts with the exchange of the receiving service
func (*TopicBuilder) AddWord ¶
func (rh *TopicBuilder) AddWord(word TopicWord) *TopicBuilder
AddWord adds a new word to the topic
type TraceCarrier ¶ added in v0.2.4
func (TraceCarrier) Get ¶ added in v0.2.4
func (c TraceCarrier) Get(key string) string
func (TraceCarrier) Keys ¶ added in v0.2.4
func (c TraceCarrier) Keys() []string
func (TraceCarrier) Set ¶ added in v0.2.4
func (c TraceCarrier) Set(key string, value string)