Documentation
¶
Index ¶
- func Close() error
- func Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)
- func NewURL(hostname string, port int, username, password string, vhost ...string) string
- func Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error
- func RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, ...) *pool.BatchHandler
- func RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler
- func RegisterTopologyCreator(topology TopologyFunc)
- func RegisterTopologyDeleter(finalizer TopologyFunc)
- func Reset() error
- func Start(ctx context.Context, connectUrl string, options ...Option) (err error)
- type AMQPX
- func (a *AMQPX) Close() error
- func (a *AMQPX) Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)
- func (a *AMQPX) Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error
- func (a *AMQPX) RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, ...) *pool.BatchHandler
- func (a *AMQPX) RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler
- func (a *AMQPX) RegisterTopologyCreator(topology TopologyFunc)
- func (a *AMQPX) RegisterTopologyDeleter(finalizer TopologyFunc)
- func (a *AMQPX) Reset() error
- func (a *AMQPX) Start(ctx context.Context, connectUrl string, options ...Option) (err error)
- type Option
- func WithBufferCapacity(capacity int) Option
- func WithCloseTimeout(timeout time.Duration) Option
- func WithConfirms(requirePublishConfirms bool) Option
- func WithConnectionTimeout(timeout time.Duration) Option
- func WithHeartbeatInterval(interval time.Duration) Option
- func WithLogger(logger *slog.Logger) Option
- func WithName(name string) Option
- func WithPoolOption(po pool.Option) Option
- func WithPublisherConnections(connections int) Option
- func WithPublisherSessions(sessions int) Option
- func WithSubscriberConnections(connections int) Option
- func WithTLS(config *tls.Config) Option
- type TopologyFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Get ¶
Get is only supposed to be used for testing, do not use get for polling any broker queues.
func NewURL ¶
NewURL creates a new connection string for the NewSessionFactory hostname: e.g. localhost port: e.g. 5672 username: e.g. username password: e.g. password vhost: e.g. "" or "/"
func Publish ¶
Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
func RegisterBatchHandler ¶ added in v0.7.0
func RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, option ...pool.BatchHandlerOption) *pool.BatchHandler
RegisterBatchHandler registers a handler function for a specific queue that processes batches. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)
func RegisterHandler ¶
func RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler
RegisterHandler registers a handler function for a specific queue. consumer can be set to a unique consumer name (if left empty, a unique name will be generated) The returned handler can be used to pause message processing and resume paused processing. The processing must have been started with Start before it can be paused or resumed.
func RegisterTopologyCreator ¶
func RegisterTopologyCreator(topology TopologyFunc)
RegisterTopology registers a topology creating function that is called upon Start. The creation of topologie sis the first step before any publisher or subscriber is started.
func RegisterTopologyDeleter ¶
func RegisterTopologyDeleter(finalizer TopologyFunc)
RegisterTopologyDeleter registers a topology finalizer that is executed at the end of amqpx.Close().
func Reset ¶
func Reset() error
Reset closes the current package and resets its state before it was initialized and started.
func Start ¶
Start starts the subscriber and publisher pools. In case no handlers were registered, no subscriber pool will be started. connectUrl has the form: amqp://username:password@localhost:5672 pubSessions is the number of pooled sessions (channels) for the publisher. options are optional pool connection options. They might also contain publisher specific settings like publish confirmations or a custom context which can signal an application shutdown. This customcontext does not replace the Close() call. Always defer a Close() call. Start is a non-blocking operation. The startup context may differ from the cancelation context provided via the options.
Types ¶
type AMQPX ¶ added in v0.3.0
type AMQPX struct {
// contains filtered or unexported fields
}
func (*AMQPX) Get ¶ added in v0.3.0
func (a *AMQPX) Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)
Get is only supposed to be used for testing, do not use get for polling any broker queues.
func (*AMQPX) Publish ¶ added in v0.3.0
func (a *AMQPX) Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error
Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
func (*AMQPX) RegisterBatchHandler ¶ added in v0.6.0
func (a *AMQPX) RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, option ...pool.BatchHandlerOption) *pool.BatchHandler
RegisterBatchHandler registers a handler function for a specific queue that processes batches. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)
func (*AMQPX) RegisterHandler ¶ added in v0.3.0
func (a *AMQPX) RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler
RegisterHandler registers a handler function for a specific queue. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)
func (*AMQPX) RegisterTopologyCreator ¶ added in v0.3.0
func (a *AMQPX) RegisterTopologyCreator(topology TopologyFunc)
RegisterTopology registers a topology creating function that is called upon Start. The creation of topologie sis the first step before any publisher or subscriber is started.
func (*AMQPX) RegisterTopologyDeleter ¶ added in v0.3.0
func (a *AMQPX) RegisterTopologyDeleter(finalizer TopologyFunc)
RegisterTopologyDeleter registers a topology finalizer that is executed at the end of amqpx.Close().
func (*AMQPX) Reset ¶ added in v0.3.0
Reset closes the current package and resets its state before it was initialized and started.
func (*AMQPX) Start ¶ added in v0.3.0
Start starts the subscriber and publisher pools. In case no handlers were registered, no subscriber pool will be started. connectUrl has the form: amqp://username:password@localhost:5672 pubSessions is the number of pooled sessions (channels) for the publisher. options are optional pool connection options. They might also contain publisher specific settings like publish confirmations or a custom context which can signal an application shutdown. This customcontext does not replace the Close() call. Always defer a Close() call. Start is a non-blocking operation.
type Option ¶
type Option func(*option)
func WithBufferCapacity ¶ added in v0.8.0
WithBufferCapacity allows to configurethe size of the confirmation, error & blocker buffers of all sessions
func WithCloseTimeout ¶ added in v0.7.0
WithCloseTimeout affects the duration that the topology deleter functions are allowed to delete topologies. This timeout is especially interesting for containerized environments where containers may potentionally be killed after a specific timeout. To we want to cancel deletion operations before those hard kill comes into play.
func WithConfirms ¶
WithConfirms requires all messages from sessions to be acked. This affects publishers.
func WithConnectionTimeout ¶
WithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
func WithHeartbeatInterval ¶
WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func WithLogger ¶
WithLogger allows to set a custom logger for the connection AND session pool
func WithPoolOption ¶
WithPoolOption is a functionthat allows to directly manipulate the options of the underlying pool. DO NOT USE this option unless you have read the source code enough in order to understand what configutaion options influence what behavior. This might make sense if you want to change the pool name prefix or suffix.
func WithPublisherConnections ¶
WithPublisherConnections defines the number of tcp connections of the publisher.
func WithPublisherSessions ¶
WithPublisherSessions defines the number of multiplexed sessions for all connections. Meaning, if you have 1 connection and two sessions, every connectionhas two sessions. If you have two connections and two sessions, every connection gets one session. Every connection gets a session assigned to it in a round robin manner.
func WithSubscriberConnections ¶
WithSubscriberConnections defines the number connections all of the consumer sessions share. Meaning, if you have registered 10 handlers and define 5 connections, every connection has two sessions that are multiplexed over it. If you have 1 connection, all consumers will derive sessions from that connection in order to consume from the specified queue. You cannot have less than one connection, nor can you havemore connections than handlers, as there can at most be one (tcp) connection with one session per handler.