rabbit

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TopicExchange  exchangeType = "topic"
	DirectExchange exchangeType = "direct"
	FanoutExchange exchangeType = "fanout"
)

Variables

View Source
var ErrResponse = errors.New("responded with an error")

Functions

func NewError

func NewError(errorMessage string, errorCode grpc.ErrorCode) *grpc.Error

func WithConcurrentReplyConsumer

func WithConcurrentReplyConsumer(number int) func(options *Options)

WithConcurrentReplyConsumer sets the number of go routines that will handle the reply queue

func WithLogger

func WithLogger(logger rabbitmq.Logger) func(options *Options)

func WithMultiplePublishers

func WithMultiplePublishers(number int) func(options *Options)

WithMultiplePublishers sets the number of routines running a publisher, each has it's own TCP connection

func WithPublisherHeader added in v0.2.4

func WithPublisherHeader(header []HeaderValue) func(options *PublisherOptions)

func WithPublisherTracing added in v0.2.4

func WithPublisherTracing(tracing bool) func(options *PublisherOptions)

Types

type Configuration

type Configuration struct {
	// URL is the address for connecting to the RabbitMQ instance
	URL string `json:"address" yaml:"address" validate:"required"`
}

Configuration AMQP basic configuration for the message bus

type ConsumerFactory added in v0.2.4

type ConsumerFactory struct {
	// contains filtered or unexported fields
}

func NewConsumerFactory added in v0.2.4

func NewConsumerFactory(connection *rabbitmq.Conn, exchange Exchange, metrics rabbitMetrics, obs observability.Observability, opts ...ConsumerOpt) ConsumerFactory

NewConsumerFactory creates a new consumer factory for the given exchange Opts will be applied to every consumer created by this factory and can be overridden by the consumer itself

func (*ConsumerFactory) NewConsumer added in v0.2.4

func (cm *ConsumerFactory) NewConsumer(exchange Exchange, topic Topic, queueName string, handler HandlerFunc, durable bool, opts ...ConsumerOpt) (*rabbitmq.Consumer, error)

NewConsumer creates a new consumer for given exchange, topic and handler function, the returned consumer should only be used for disconnecting

type ConsumerOpt added in v0.2.4

type ConsumerOpt func(*ConsumerOpts)

func WithDefaultTimeout added in v0.2.4

func WithDefaultTimeout(timeout time.Duration) ConsumerOpt

func WithRoutines added in v0.2.4

func WithRoutines(routines int) ConsumerOpt

type ConsumerOpts added in v0.2.4

type ConsumerOpts struct {
	// contains filtered or unexported fields
}

type Exchange

type Exchange string
const (
	CentralExchange Exchange = "CENTRAL"

	GlobalNotificationExchange Exchange = "NOTIFICATION"
)

Exchanges

func (Exchange) String

func (t Exchange) String() string

type HandlerFunc

type HandlerFunc func(ctx context.Context, d rabbitmq.Delivery) (action rabbitmq.Action)
type Header struct {
	// contains filtered or unexported fields
}

func NewHeader

func NewHeader() *Header

func (*Header) Build

func (rh *Header) Build() []HeaderValue

func (*Header) WithError

func (rh *Header) WithError(err bool) *Header

func (*Header) WithField

func (rh *Header) WithField(key HeaderKey, value any) *Header

When adding custom fields, make sure the value is supported: https://pkg.go.dev/github.com/wagslane/go-rabbitmq#Table

func (*Header) WithMethod

func (rh *Header) WithMethod(method TopicWord) *Header

type HeaderKey

type HeaderKey string
const (
	HeaderKeyError     HeaderKey = "error"
	HeaderKeyMethod    HeaderKey = "method"
	HeaderKeyReplyType HeaderKey = "reply_type"
)

type HeaderReplyType

type HeaderReplyType string
const (
	HeaderReplyTypeAcknowledge        HeaderReplyType = "acknowledge"
	HeaderReplyTypeInterceptedMessage HeaderReplyType = "intercepted_message"
)

type HeaderValue

type HeaderValue struct {
	Key   HeaderKey
	Value any
}

type Options

type Options struct {
	// contains filtered or unexported fields
}

type PublishOpt added in v0.2.4

type PublishOpt func(*PublisherOptions)

type PublishRequest

type PublishRequest struct {
	Ctx             context.Context
	Topic           Topic
	CorrelationId   string
	Message         proto.Message
	Options         []PublishOpt
	ResponseChannel chan error
}

type Publisher

type Publisher struct {
	Publisher *rabbitmq.Publisher
	// contains filtered or unexported fields
}

func (*Publisher) Publish

func (pb *Publisher) Publish(ctx context.Context, topic string, message proto.Message, correlationID string, replyTopic Topic, optionFuncs ...PublishOpt) error

type PublisherOptions

type PublisherOptions struct {
	// contains filtered or unexported fields
}

type PublisherPool

type PublisherPool struct {
	// contains filtered or unexported fields
}

func (*PublisherPool) Publish

func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) error

Publish publishes a rabbit message Returns an error, only initialize it if needed, error already logged

func (*PublisherPool) PublishRPC

func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, options ...PublishOpt) ([]byte, error)

PublishRPC publishes a RPC message and waits for the reply

func (*PublisherPool) PublishRPCWithMultipleResponses

func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, options ...PublishOpt) (chan ReplyResponse, error)

func (*PublisherPool) Respond

func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, options ...PublishOpt) error

Respond publishes a response to a rabbit message Set isError to true if the reply is an error, otherwise pass false to indicate valid response Returns an error, only initialize it if necessary

func (*PublisherPool) RespondWithError

func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, options ...PublishOpt) error

type Rabbit

type Rabbit struct {
	ConsumerFactory ConsumerFactory
	Publisher       PublisherPool

	Exchange Exchange
	// contains filtered or unexported fields
}

func New

func New(configuration Configuration, serviceExchange Exchange, obs observability.Observability, opts ...func(*Options)) (*Rabbit, error)

New creates and returns a new rabbit client with given configuration

func (*Rabbit) Disconnect

func (c *Rabbit) Disconnect() error

Disconnect disconnects all rabbit connections

func (*Rabbit) Name

func (c *Rabbit) Name() string

func (*Rabbit) Pass

func (c *Rabbit) Pass() bool

type ReplyPool

type ReplyPool struct {
	Request  chan ReplyRequest
	Response chan ReplyResponse
	Cancel   chan string
	Clients  map[string]*client
}

func NewReplyPool

func NewReplyPool(bufferSize int) ReplyPool

NewReplyPool creates and returns a new ReplyPool

type ReplyRequest

type ReplyRequest struct {
	CorrelationId string
	RequestChan   chan ReplyResponse
	// ExpectedResponsesNr represents the number of responses
	// expected for this client
	ExpectedResponsesNr int
}

type ReplyResponse

type ReplyResponse struct {
	CorrelationId string
	Body          []byte
	Error         bool
	Headers       map[string]any
}

type Topic

type Topic string

func (Topic) String

func (t Topic) String() string

type TopicBuilder

type TopicBuilder struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(service Exchange) *TopicBuilder

NewTopic creates a new topic that starts with the exchange of the receiving service

func (*TopicBuilder) AddWord

func (rh *TopicBuilder) AddWord(word TopicWord) *TopicBuilder

AddWord adds a new word to the topic

func (*TopicBuilder) Build

func (rh *TopicBuilder) Build() Topic

Build returns a topic string

type TopicWord

type TopicWord string

func (TopicWord) String

func (t TopicWord) String() string

type TraceCarrier added in v0.2.4

type TraceCarrier rabbitmq.Table

func (TraceCarrier) Get added in v0.2.4

func (c TraceCarrier) Get(key string) string

func (TraceCarrier) Keys added in v0.2.4

func (c TraceCarrier) Keys() []string

func (TraceCarrier) Set added in v0.2.4

func (c TraceCarrier) Set(key string, value string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL