Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerConfig
- type Consumer
- type ConsumerConfig
- func (c *ConsumerConfig) BuildDeadletterQueue(routes *Routes, ch *amqp.Channel, con *amqp.Connection, ex string) (err error)
- func (c *ConsumerConfig) BuildQueue(queueName string, routes *Routes, ch *amqp.Channel, ex string) (err error)
- func (e *ConsumerConfig) GetArgs() map[string]interface{}
- func (c *ConsumerConfig) GetAutoDelete() bool
- func (e *ConsumerConfig) GetDeadletterName() string
- func (c *ConsumerConfig) GetDurable() bool
- func (c *ConsumerConfig) GetExclusive() bool
- func (e *ConsumerConfig) GetHasDeadletter() bool
- func (c *ConsumerConfig) GetName() string
- func (c *ConsumerConfig) GetNoWait() bool
- func (c *ConsumerConfig) GetPrefetchCount() uint
- func (c *ConsumerConfig) GetPrefetchSize() uint
- type Exchange
- type ExchangeConfig
- func (e *ExchangeConfig) BuildExchange(ch *amqp.Channel) (err error)
- func (e *ExchangeConfig) GetArgs() map[string]interface{}
- func (e *ExchangeConfig) GetAutoDelete() bool
- func (e *ExchangeConfig) GetDurable() bool
- func (e *ExchangeConfig) GetInternal() bool
- func (e *ExchangeConfig) GetName() (string, error)
- func (e *ExchangeConfig) GetType() string
- type HandlerFunc
- type Host
- type HostConfig
- type HostMiddleware
- type KeyHandlerFunc
- type MessageHandler
- type MiddlewareList
- type RabbitHost
- func (h *RabbitHost) AddBroker(ctx context.Context, cfg *ExchangeConfig, consumers []Consumer) error
- func (h *RabbitHost) GetConnectionStatus() bool
- func (h *RabbitHost) Middleware(fn ...HostMiddleware)
- func (h *RabbitHost) Run(ctx context.Context) (err error)
- func (h *RabbitHost) Stop(context.Context) error
- type Routes
Constants ¶
const (
TOPIC_EXCHANGE = "topic"
)
Variables ¶
var (
ERRNAMEREQUIRED = errors.New("name is a required exchange field")
)
Functions ¶
This section is empty.
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
Exchange ExchangeConfig
Consumers map[string]ConsumerConfig
}
type Consumer ¶
type Consumer interface {
// Init can be used to get a custom
// consumer config. If it returns nil a consumer
// with default params will be setup
Init() (*ConsumerConfig, error)
// Prefix defines a common prefix to be added
// to all queue names
Prefix() string
// Middleware can be used to implement custom
// middleware which gets called before messages
// are passed to handlers
Middleware(HandlerFunc) HandlerFunc
// Queues is used to define the queues, keys and handlers
// Config is passed which can be used to set QOS and consumer name
Queues(context.Context) map[string]*Routes
}
Consumer is an interface which can be implemented to create a consumer
The consumer can setup multiple queues, define n routing keys for each queue and in turn assign a handler to manage the messages received
Custom middleware can be added using the Middleware method
Example implementation shown below:
type MyConsumer struct{
}
func NewMyConsumer() events.Consumer {
return &MyConsumer{}
}
func (c *MyConsumer) Prefix() string{
return ""
}
func (c *MyConsumer) Middleware(h events.HandlerFunc) events.HandlerFunc{
return func(m events.BasicMessage) error {
return h(m)
}
}
func (c *MyConsumer) Setup(ctx context.Context) (map[string]*events.ConsumerRoutes, ){
return map[string]*events.ConsumerRoutes{"mark.queue":{
Keys: []string{"test.message", "mark.#"},
Handler:c.TestHandler,
},
}
}
func (c *MyConsumer) TestHandler(m events.BasicMessage) error{
return nil
}
type ConsumerConfig ¶
type ConsumerConfig struct {
Name string
Durable *bool
AutoDelete *bool
NoWait *bool
Exclusive *bool
Ttl *uint
PrefetchCount *uint
PrefetchSize *uint
Args map[string]interface{}
HasDeadletter *bool
DeadletterName *string
}
ConsumerConfig defines the setup of a consumer If this isn't set default values will be used. To set a custom config for a consumer setup a new consumer struct, pass the config and return it in the Init() method
func (*ConsumerConfig) BuildDeadletterQueue ¶
func (c *ConsumerConfig) BuildDeadletterQueue(routes *Routes, ch *amqp.Channel, con *amqp.Connection, ex string) (err error)
func (*ConsumerConfig) BuildQueue ¶
func (*ConsumerConfig) GetArgs ¶
func (e *ConsumerConfig) GetArgs() map[string]interface{}
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ConsumerConfig) GetAutoDelete ¶
func (c *ConsumerConfig) GetAutoDelete() bool
GetAutoDelete determines whether the queue is deleted on server restart, default is false
func (*ConsumerConfig) GetDeadletterName ¶
func (e *ConsumerConfig) GetDeadletterName() string
GetDeadletterName gets the name for the deadletter queue to be setup, if nil then a name of %QueueName%.deadletter is used
func (*ConsumerConfig) GetDurable ¶
func (c *ConsumerConfig) GetDurable() bool
GetDurable returns the type of durability set in config, if nil then it returns a default of true
func (*ConsumerConfig) GetExclusive ¶
func (c *ConsumerConfig) GetExclusive() bool
GetExclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. default is false
func (*ConsumerConfig) GetHasDeadletter ¶
func (e *ConsumerConfig) GetHasDeadletter() bool
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ConsumerConfig) GetName ¶
func (c *ConsumerConfig) GetName() string
GetName returns the consumer name if set in config otherwise it returns a random uuid
func (*ConsumerConfig) GetNoWait ¶
func (c *ConsumerConfig) GetNoWait() bool
GetNoWait When true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection. default is false
func (*ConsumerConfig) GetPrefetchCount ¶
func (c *ConsumerConfig) GetPrefetchCount() uint
GetPrefetchCount returns the Qos value for number of messages pulled from the queue at a time default is 0 which will pull the default count for most libs
func (*ConsumerConfig) GetPrefetchSize ¶
func (c *ConsumerConfig) GetPrefetchSize() uint
type ExchangeConfig ¶
type ExchangeConfig struct {
Name string
// contains filtered or unexported fields
}
Exchange config sets up a new exchange with the provided params Defaults are enabled so not all params may need set depending on requirements
func (*ExchangeConfig) BuildExchange ¶
func (e *ExchangeConfig) BuildExchange(ch *amqp.Channel) (err error)
BuildExchange builds an exchange
func (*ExchangeConfig) GetArgs ¶
func (e *ExchangeConfig) GetArgs() map[string]interface{}
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ExchangeConfig) GetAutoDelete ¶
func (e *ExchangeConfig) GetAutoDelete() bool
GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false
func (*ExchangeConfig) GetDurable ¶
func (e *ExchangeConfig) GetDurable() bool
GetDurable returns the type of durability set in config, if nil then it returns a default of true
func (*ExchangeConfig) GetInternal ¶
func (e *ExchangeConfig) GetInternal() bool
GetInternal determines whether this exchange can only be published to from other exchanges default value is false meaning external sources can by default publish to this exchange
func (*ExchangeConfig) GetName ¶
func (e *ExchangeConfig) GetName() (string, error)
GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false
func (*ExchangeConfig) GetType ¶
func (e *ExchangeConfig) GetType() string
GetType returns the type of exchange set in config, if nil then it returns a default of Topic
type HandlerFunc ¶
func JsonHandler ¶
func JsonHandler(h HandlerFunc) HandlerFunc
func MessageDump ¶
func MessageDump(h HandlerFunc) HandlerFunc
MessageDump will output the entire amqp message with the body converted to a string Handy for debugging
func (HandlerFunc) HandleMessage ¶
func (f HandlerFunc) HandleMessage(ctx context.Context, m amqp.Delivery)
type Host ¶
type Host interface {
// AddBroker will register an exchange and n consumers
// which will consume from that exchange
AddBroker(context.Context, *ExchangeConfig, []Consumer) error
// Start will setup all queues and routing keys
// assigned to each consumer and then in turn start them
Run(context.Context) (err error)
// Middleware can be used to implement custom
// middleware which gets called before messages
// are passed to handlers
Middleware(...HostMiddleware)
// Stop can be called when you wish to shut down the host
Stop(context.Context) error
GetConnectionStatus() bool
}
Host is the container which is used to host all consumers that are registered. It is responsible for the amqp connection starting & gracefully stopping all running consumers h := NewRabbitHost().Init(cfg.Host) h.AddBroker(NewBroker(cfg.Exchange, [])
func NewConsumerHost ¶
func NewConsumerHost(cfg *HostConfig) Host
Init sets up the initial connection & quality of service to be used by all registered consumers
type HostConfig ¶
type HostConfig struct {
Address string
}
HostConfig contains global config used for the rabbit connection
type HostMiddleware ¶
type HostMiddleware func(handler HandlerFunc) HandlerFunc
type MessageHandler ¶
MessageHandler works in the same way httpHandlers do allowing middleware etc to be used on consumers
type MiddlewareList ¶
type MiddlewareList []HostMiddleware
type RabbitHost ¶
type RabbitHost struct {
// contains filtered or unexported fields
}
func (*RabbitHost) AddBroker ¶
func (h *RabbitHost) AddBroker(ctx context.Context, cfg *ExchangeConfig, consumers []Consumer) error
AddBroker will register an exchange and n consumers which will consume from that exchange
func (*RabbitHost) GetConnectionStatus ¶
func (h *RabbitHost) GetConnectionStatus() bool
func (*RabbitHost) Middleware ¶
func (h *RabbitHost) Middleware(fn ...HostMiddleware)
type Routes ¶
type Routes struct {
Keys []string
DeliveryFunc KeyHandlerFunc
}
Routes contains a set of routing keys and a handlerFunc that will be used to process messages meeting the routing keys