Documentation
¶
Index ¶
- Variables
- type BatchHandler
- func (h *BatchHandler) Config() BatchHandlerConfig
- func (h *BatchHandler) ConsumeOptions() types.ConsumeOptions
- func (h *BatchHandler) FlushTimeout() time.Duration
- func (h *BatchHandler) IsActive(ctx context.Context) (active bool, err error)
- func (h *BatchHandler) MaxBatchBytes() int
- func (h *BatchHandler) MaxBatchSize() int
- func (h *BatchHandler) Pause(ctx context.Context) error
- func (h *BatchHandler) Queue() string
- func (h *BatchHandler) QueueConfig() QueueConfig
- func (h *BatchHandler) Resume(ctx context.Context) error
- func (h *BatchHandler) SetConsumeOptions(consumeOpts types.ConsumeOptions)
- func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration)
- func (h *BatchHandler) SetHandlerFunc(hf BatchHandlerFunc)
- func (h *BatchHandler) SetMaxBatchBytes(maxBatchBytes int)
- func (h *BatchHandler) SetMaxBatchSize(maxBatchSize int)
- func (h *BatchHandler) SetQueue(queue string)
- type BatchHandlerConfig
- type BatchHandlerFunc
- type BatchHandlerOption
- type ConnectionPool
- func (cp *ConnectionPool) Capacity() int
- func (cp *ConnectionPool) Close()
- func (cp *ConnectionPool) GetConnection(ctx context.Context) (conn *types.Connection, err error)
- func (cp *ConnectionPool) GetTransientConnection(ctx context.Context) (conn *types.Connection, err error)
- func (cp *ConnectionPool) Name() string
- func (cp *ConnectionPool) ReturnConnection(conn *types.Connection, err error)
- func (cp *ConnectionPool) Size() int
- func (cp *ConnectionPool) StatCachedActive() int
- func (cp *ConnectionPool) StatTransientActive() int64
- type ConnectionPoolOption
- func ConnectionPoolWithConnectionTimeout(timeout time.Duration) ConnectionPoolOption
- func ConnectionPoolWithHeartbeatInterval(interval time.Duration) ConnectionPoolOption
- func ConnectionPoolWithLogger(logger *slog.Logger) ConnectionPoolOption
- func ConnectionPoolWithName(name string) ConnectionPoolOption
- func ConnectionPoolWithNamePrefix(prefix string) ConnectionPoolOption
- func ConnectionPoolWithNameSuffix(suffix string) ConnectionPoolOption
- func ConnectionPoolWithRecoverCallback(callback types.ConnectionRecoverCallback) ConnectionPoolOption
- func ConnectionPoolWithTLS(config *tls.Config) ConnectionPoolOption
- type Handler
- func (h *Handler) Config() HandlerConfig
- func (h *Handler) ConsumeOptions() types.ConsumeOptions
- func (h *Handler) IsActive(ctx context.Context) (active bool, err error)
- func (h *Handler) Pause(ctx context.Context) error
- func (h *Handler) Queue() string
- func (h *Handler) QueueConfig() QueueConfig
- func (h *Handler) Resume(ctx context.Context) error
- func (h *Handler) SetConsumeOptions(consumeOpts types.ConsumeOptions)
- func (h *Handler) SetHandlerFunc(hf HandlerFunc)
- func (h *Handler) SetQueue(queue string)
- type HandlerConfig
- type HandlerFunc
- type Option
- func WithBufferCapacity(size int) Option
- func WithConfirms(requirePublishConfirms bool) Option
- func WithConnectionRecoverCallback(callback types.ConnectionRecoverCallback) Option
- func WithConnectionTimeout(timeout time.Duration) Option
- func WithHeartbeatInterval(interval time.Duration) Option
- func WithLogger(logger *slog.Logger) Option
- func WithName(name string) Option
- func WithNamePrefix(prefix string) Option
- func WithNameSuffix(suffix string) Option
- func WithSessionConsumeContextRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionExchangeBindRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionExchangeDeclarePassiveRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionExchangeDeclareRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionExchangeDeleteRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionExchangeUnbindRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionFlowRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionGetRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionPublishRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQoSRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueueBindRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueueDeclarePassiveRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueueDeclareRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueueDeleteRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueuePurgeRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionQueueUnbindRetryCallback(callback types.SessionRetryCallback) Option
- func WithSessionRecoverCallback(callback types.SessionRetryCallback) Option
- func WithSessionRetryCallback(callback types.SessionRetryCallback) Option
- func WithTLS(config *tls.Config) Option
- type Pool
- func (p *Pool) Close()
- func (p *Pool) ConnectionPoolCapacity() int
- func (p *Pool) ConnectionPoolSize() int
- func (p *Pool) Context() context.Context
- func (p *Pool) ForceGetSession(ctx context.Context) (*types.Session, error)
- func (p *Pool) GetSession(ctx context.Context) (*types.Session, error)
- func (p *Pool) GetTransientSession(ctx context.Context) (*types.Session, error)
- func (p *Pool) Name() string
- func (p *Pool) ReturnSession(session *types.Session, err error)
- func (p *Pool) SessionPoolCapacity() int
- func (p *Pool) SessionPoolSize() int
- type Publisher
- func (p *Publisher) Close()
- func (p *Publisher) Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)
- func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error
- func (p *Publisher) PublishBatch(ctx context.Context, exchange string, routingKey string, ...) error
- type PublisherOption
- type QueueConfig
- type SessionPool
- func (sp *SessionPool) Capacity() int
- func (sp *SessionPool) Close()
- func (sp *SessionPool) ForceGetSession(ctx context.Context) (s *types.Session, err error)
- func (sp *SessionPool) GetSession(ctx context.Context) (s *types.Session, err error)
- func (sp *SessionPool) GetTransientSession(ctx context.Context) (s *types.Session, err error)
- func (sp *SessionPool) ReturnSession(session *types.Session, err error)
- func (sp *SessionPool) Size() int
- type SessionPoolOption
- func SessionPoolWithAutoCloseConnectionPool(autoClose bool) SessionPoolOption
- func SessionPoolWithBufferCapacity(capacity int) SessionPoolOption
- func SessionPoolWithConfirms(requirePublishConfirms bool) SessionPoolOption
- func SessionPoolWithConsumeContextRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeBindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeclarePassiveRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeclareRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeleteRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeUnbindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithFlowRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithGetRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithLogger(logger *slog.Logger) SessionPoolOption
- func SessionPoolWithPublishRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQoSRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueBindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeclarePassiveRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeclareRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeleteRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueuePurgeRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueUnbindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithRecoverCallback(callback types.SessionRetryCallback) SessionPoolOption
- func SessionPoolWithRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
- type Subscriber
- func (s *Subscriber) Close()
- func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler)
- func (s *Subscriber) RegisterBatchHandlerFunc(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
- func (s *Subscriber) RegisterHandler(handler *Handler)
- func (s *Subscriber) RegisterHandlerFunc(queue string, hf HandlerFunc, options ...types.ConsumeOptions) *Handler
- func (s *Subscriber) Start(ctx context.Context) (err error)
- func (s *Subscriber) Wait()
- type SubscriberOption
- type Topologer
- func (t *Topologer) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, ...) (err error)
- func (t *Topologer) ExchangeDeclare(ctx context.Context, name string, kind types.ExchangeKind, ...) (err error)
- func (t *Topologer) ExchangeDeclarePassive(ctx context.Context, name string, kind types.ExchangeKind, ...) (err error)
- func (t *Topologer) ExchangeDelete(ctx context.Context, name string, option ...types.ExchangeDeleteOptions) (err error)
- func (t *Topologer) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, ...) (err error)
- func (t *Topologer) QueueBind(ctx context.Context, name string, routingKey string, exchange string, ...) (err error)
- func (t *Topologer) QueueDeclare(ctx context.Context, name string, option ...types.QueueDeclareOptions) (queue types.Queue, err error)
- func (t *Topologer) QueueDeclarePassive(ctx context.Context, name string, option ...types.QueueDeclareOptions) (queue types.Queue, err error)
- func (t *Topologer) QueueDelete(ctx context.Context, name string, option ...types.QueueDeleteOptions) (purged int, err error)
- func (t *Topologer) QueuePurge(ctx context.Context, name string, options ...types.QueuePurgeOptions) (int, error)
- func (t *Topologer) QueueUnbind(ctx context.Context, name string, routingKey string, exchange string, ...) (err error)
- type TopologerOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrPoolInitializationFailed = errors.New("pool initialization failed") // ErrPauseFailed is returned by (Batch)Handler.Pause in case that the passed context is canceled ErrPauseFailed = errors.New("failed to pause handler") // ErrResumeFailed is returned by (Batch)Handler.Resume in case that the passed context is canceled ErrResumeFailed = errors.New("failed to resume handler") )
Functions ¶
This section is empty.
Types ¶
type BatchHandler ¶ added in v0.6.0
type BatchHandler struct {
// contains filtered or unexported fields
}
BatchHandler is a struct that contains all parameters needed in order to register a batch handler function.
func NewBatchHandler ¶ added in v0.7.0
func NewBatchHandler(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
NewHandler creates a new handler which is primarily a combination of your passed handler function and the queue name from which the handler fetches messages and processes those. Additionally, the handler allows you to pause and resume processing from the provided queue.
func (*BatchHandler) Config ¶ added in v0.7.0
func (h *BatchHandler) Config() BatchHandlerConfig
func (*BatchHandler) ConsumeOptions ¶ added in v0.7.0
func (h *BatchHandler) ConsumeOptions() types.ConsumeOptions
func (*BatchHandler) FlushTimeout ¶ added in v0.6.1
func (h *BatchHandler) FlushTimeout() time.Duration
func (*BatchHandler) IsActive ¶ added in v0.7.0
func (h *BatchHandler) IsActive(ctx context.Context) (active bool, err error)
func (*BatchHandler) MaxBatchBytes ¶ added in v0.7.2
func (h *BatchHandler) MaxBatchBytes() int
func (*BatchHandler) MaxBatchSize ¶ added in v0.6.1
func (h *BatchHandler) MaxBatchSize() int
func (*BatchHandler) Pause ¶ added in v0.7.0
func (h *BatchHandler) Pause(ctx context.Context) error
Pause allows to halt the processing of a queue after the processing has been started by the subscriber.
func (*BatchHandler) Queue ¶ added in v0.6.0
func (h *BatchHandler) Queue() string
func (*BatchHandler) QueueConfig ¶ added in v0.7.1
func (h *BatchHandler) QueueConfig() QueueConfig
func (*BatchHandler) Resume ¶ added in v0.7.0
func (h *BatchHandler) Resume(ctx context.Context) error
Resume allows to continue the processing of a queue after it has been paused using Pause
func (*BatchHandler) SetConsumeOptions ¶ added in v0.7.0
func (h *BatchHandler) SetConsumeOptions(consumeOpts types.ConsumeOptions)
func (*BatchHandler) SetFlushTimeout ¶ added in v0.7.0
func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration)
func (*BatchHandler) SetHandlerFunc ¶ added in v0.7.0
func (h *BatchHandler) SetHandlerFunc(hf BatchHandlerFunc)
SetHandlerFunc changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
func (*BatchHandler) SetMaxBatchBytes ¶ added in v0.7.2
func (h *BatchHandler) SetMaxBatchBytes(maxBatchBytes int)
func (*BatchHandler) SetMaxBatchSize ¶ added in v0.7.0
func (h *BatchHandler) SetMaxBatchSize(maxBatchSize int)
func (*BatchHandler) SetQueue ¶ added in v0.7.0
func (h *BatchHandler) SetQueue(queue string)
SetQueue changes the current queue to another queue from which the handler consumes messages. The actual change is effective after pausing and resuming the handler.
type BatchHandlerConfig ¶ added in v0.7.0
type BatchHandlerConfig struct {
Queue string
types.ConsumeOptions
HandlerFunc BatchHandlerFunc
// Maximum number of messages
MaxBatchSize int
// Maximum size of a batch in bytes (soft limit which triggers a batch to be processed)
// does not guarantee that the batch size is not exceeded.
MaxBatchBytes int
FlushTimeout time.Duration
}
BatchHandlerConfig is a read only snapshot of the current handler's configuration.
type BatchHandlerFunc ¶ added in v0.6.0
BatchHandlerFunc is a handler for incoming batches of messages/events
type BatchHandlerOption ¶ added in v0.7.0
type BatchHandlerOption func(*BatchHandler)
func WithBatchConsumeOptions ¶ added in v0.7.0
func WithBatchConsumeOptions(opts types.ConsumeOptions) BatchHandlerOption
func WithBatchFlushTimeout ¶ added in v0.7.0
func WithBatchFlushTimeout(d time.Duration) BatchHandlerOption
func WithMaxBatchBytes ¶ added in v0.7.2
func WithMaxBatchBytes(size int) BatchHandlerOption
WithMaxBatchBytes sets the maximum size of a batch in bytes. If the batch size exceeds this limit, the batch is passed to the handler function. If the value is set to 0, the batch size is not limited by bytes.
func WithMaxBatchSize ¶ added in v0.7.0
func WithMaxBatchSize(size int) BatchHandlerOption
WithMaxBatchSize sets the maximum size of a batch. If set to 0 the batch size is not limited. This means that the batch size is only limited by the maximum batch size in bytes.
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶
func NewConnectionPool(ctx context.Context, connectUrl string, numConns int, options ...ConnectionPoolOption) (*ConnectionPool, error)
NewConnectionPool creates a new connection pool which has a maximum size it can become and an idle size of connections that are always open.
func (*ConnectionPool) Capacity ¶ added in v0.8.0
func (cp *ConnectionPool) Capacity() int
Capacity is the capacity of the cached connection pool without any transient connections. It is the initial number of connections that were created for this connection pool.
func (*ConnectionPool) Close ¶
func (cp *ConnectionPool) Close()
Close closes the connection pool. Closes all connections and sessions that are currently known to the pool. Any new connections or session requests will return an error. Any returned sessions or connections will be closed properly.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection(ctx context.Context) (conn *types.Connection, err error)
GetConnection only returns an error upon shutdown
func (*ConnectionPool) GetTransientConnection ¶
func (cp *ConnectionPool) GetTransientConnection(ctx context.Context) (conn *types.Connection, err error)
GetTransientConnection may return an error when the context was cancelled before the connection could be obtained. Transient connections may be returned to the pool. They are closed properly upon returning.
func (*ConnectionPool) Name ¶ added in v0.2.0
func (cp *ConnectionPool) Name() string
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(conn *types.Connection, err error)
ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources. If the connection is flagged, it will be recovered and returned to the pool. If the context is canceled, the connection will be immediately returned to the pool without any recovery attempt.
func (*ConnectionPool) Size ¶ added in v0.5.0
func (cp *ConnectionPool) Size() int
Size returns the number of idle cached connections.
func (*ConnectionPool) StatCachedActive ¶ added in v0.8.0
func (cp *ConnectionPool) StatCachedActive() int
StatCachedActive returns the number of active cached connections.
func (*ConnectionPool) StatTransientActive ¶ added in v0.8.0
func (cp *ConnectionPool) StatTransientActive() int64
StatTransientActive returns the number of active transient connections.
type ConnectionPoolOption ¶
type ConnectionPoolOption func(*connectionPoolOption)
func ConnectionPoolWithConnectionTimeout ¶
func ConnectionPoolWithConnectionTimeout(timeout time.Duration) ConnectionPoolOption
ConnectionPoolWithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
func ConnectionPoolWithHeartbeatInterval ¶
func ConnectionPoolWithHeartbeatInterval(interval time.Duration) ConnectionPoolOption
WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func ConnectionPoolWithLogger ¶
func ConnectionPoolWithLogger(logger *slog.Logger) ConnectionPoolOption
ConnectionPoolWithLogger allows to set a custom logger.
func ConnectionPoolWithName ¶
func ConnectionPoolWithName(name string) ConnectionPoolOption
ConnectionPoolWithName gives all of your pooled connections a prefix name
func ConnectionPoolWithNamePrefix ¶ added in v0.2.0
func ConnectionPoolWithNamePrefix(prefix string) ConnectionPoolOption
ConnectionPoolWithNamePrefix adds a prefix to the connection pool name
func ConnectionPoolWithNameSuffix ¶ added in v0.2.0
func ConnectionPoolWithNameSuffix(suffix string) ConnectionPoolOption
ConnectionPoolWithNameSuffix adds a suffix to the connection pool name
func ConnectionPoolWithRecoverCallback ¶ added in v0.8.0
func ConnectionPoolWithRecoverCallback(callback types.ConnectionRecoverCallback) ConnectionPoolOption
ConnectionPoolWithRecoverCallback allows to set a custom recover callback.
func ConnectionPoolWithTLS ¶
func ConnectionPoolWithTLS(config *tls.Config) ConnectionPoolOption
ConnectionPoolWithTLS allows to configure tls connectivity.
type Handler ¶ added in v0.2.0
type Handler struct {
// contains filtered or unexported fields
}
Handler is a struct that contains all parameters needed in order to register a handler function to the provided queue. Additionally, the handler allows you to pause and resume processing or messages.
func NewHandler ¶ added in v0.7.0
func NewHandler(queue string, hf HandlerFunc, option ...types.ConsumeOptions) *Handler
NewHandler creates a new handler which is primarily a combination of your passed handler function and the queue name from which the handler fetches messages and processes those. Additionally, the handler allows you to pause and resume processing from the provided queue.
func (*Handler) Config ¶ added in v0.7.0
func (h *Handler) Config() HandlerConfig
func (*Handler) ConsumeOptions ¶ added in v0.7.0
func (h *Handler) ConsumeOptions() types.ConsumeOptions
func (*Handler) Pause ¶ added in v0.7.0
Pause allows to halt the processing of a queue after the processing has been started by the subscriber.
func (*Handler) QueueConfig ¶ added in v0.7.1
func (h *Handler) QueueConfig() QueueConfig
func (*Handler) Resume ¶ added in v0.7.0
Resume allows to continue the processing of a queue after it has been paused using Pause
func (*Handler) SetConsumeOptions ¶ added in v0.7.0
func (h *Handler) SetConsumeOptions(consumeOpts types.ConsumeOptions)
SetConsumeOptions changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
func (*Handler) SetHandlerFunc ¶ added in v0.7.0
func (h *Handler) SetHandlerFunc(hf HandlerFunc)
SetHandlerFunc changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
type HandlerConfig ¶ added in v0.7.0
type HandlerConfig struct {
Queue string
types.ConsumeOptions
HandlerFunc HandlerFunc
}
HandlerConfig is a read only snapshot of the current handler's configuration. This internal data structure is used in the corresponsing consumer.
type HandlerFunc ¶
HandlerFunc is basically a handler for incoming messages/events.
type Option ¶ added in v0.2.0
type Option func(*poolOption)
func WithBufferCapacity ¶ added in v0.8.0
WithBufferCapacity allows to configurethe size of the confirmation, error & blocker buffers of all sessions
func WithConfirms ¶
WithConfirms requires all messages from sessions to be acked.
func WithConnectionRecoverCallback ¶ added in v0.8.0
func WithConnectionRecoverCallback(callback types.ConnectionRecoverCallback) Option
WithConnectionRecoverCallback allows to set a custom connection recovery callback
func WithConnectionTimeout ¶
WithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
func WithHeartbeatInterval ¶
WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func WithLogger ¶
WithLogger allows to set a custom logger for the connection AND session pool
func WithNamePrefix ¶ added in v0.2.0
WithNamePrefix adds a prefix to the connection pool name
func WithNameSuffix ¶ added in v0.2.0
WithNameSuffix adds a suffix to the connection pool name
func WithSessionConsumeContextRetryCallback ¶ added in v0.8.0
func WithSessionConsumeContextRetryCallback(callback types.SessionRetryCallback) Option
WithSessionConsumeContextRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeBindRetryCallback ¶ added in v0.8.0
func WithSessionExchangeBindRetryCallback(callback types.SessionRetryCallback) Option
WithSessionExchangeBindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeclarePassiveRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeclarePassiveRetryCallback(callback types.SessionRetryCallback) Option
WithSessionExchangeDeclarePassiveRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeclareRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeclareRetryCallback(callback types.SessionRetryCallback) Option
WithSessionExchangeDeclareRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeleteRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeleteRetryCallback(callback types.SessionRetryCallback) Option
WithSessionExchangeDeleteRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeUnbindRetryCallback ¶ added in v0.8.0
func WithSessionExchangeUnbindRetryCallback(callback types.SessionRetryCallback) Option
WithSessionExchangeUnbindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionFlowRetryCallback ¶ added in v0.8.0
func WithSessionFlowRetryCallback(callback types.SessionRetryCallback) Option
WithSessionFlowRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionGetRetryCallback ¶ added in v0.8.0
func WithSessionGetRetryCallback(callback types.SessionRetryCallback) Option
WithSessionGetRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionPublishRetryCallback ¶ added in v0.8.0
func WithSessionPublishRetryCallback(callback types.SessionRetryCallback) Option
WithSessionPublishRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQoSRetryCallback ¶ added in v0.8.0
func WithSessionQoSRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQoSRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueBindRetryCallback ¶ added in v0.8.0
func WithSessionQueueBindRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueueBindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeclarePassiveRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeclarePassiveRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueueDeclarePassiveRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeclareRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeclareRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueueDeclareRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeleteRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeleteRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueueDeleteRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueuePurgeRetryCallback ¶ added in v0.8.0
func WithSessionQueuePurgeRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueuePurgeRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueUnbindRetryCallback ¶ added in v0.8.0
func WithSessionQueueUnbindRetryCallback(callback types.SessionRetryCallback) Option
WithSessionQueueUnbindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionRecoverCallback ¶ added in v0.8.0
func WithSessionRecoverCallback(callback types.SessionRetryCallback) Option
WithSessionRecoverCallback allows to set a custom session recovery callback
func WithSessionRetryCallback ¶ added in v0.8.0
func WithSessionRetryCallback(callback types.SessionRetryCallback) Option
WithSessionRetryCallback allows to set a custom retry callback for the session pool. This will set the same retry callback for all operations.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
func (*Pool) ConnectionPoolCapacity ¶ added in v0.8.0
ConnectionPoolCapacity returns the capacity of the connection pool.
func (*Pool) ConnectionPoolSize ¶ added in v0.5.0
ConnectionPoolSize returns the number of connections in the pool that are idling.
func (*Pool) ForceGetSession ¶ added in v0.10.1
ForceGetSession returns a new session from the pool, only returns an error upon shutdown or context cancelation. In case that there are no sessions in the session pool, a transient session is created and returned. A transient underlying connection is opened and closed when the session is closed. You can return (close) the session by returning it back to the pool with [ReturnSession(Session, error)].
func (*Pool) GetSession ¶
GetSession returns a new session from the pool, only returns an error upon shutdown.
func (*Pool) GetTransientSession ¶
GetTransientSession returns a new session which is decoupled from anyshutdown mechanism, thus requiring a context for timeout handling. The session does also use a transient connection which is closed when the transient session is closed.
func (*Pool) ReturnSession ¶
ReturnSession returns a Session back to the pool. If the session was returned due to an error, erred should be set to true, otherwise erred should be set to false.
func (*Pool) SessionPoolCapacity ¶ added in v0.8.0
SessionPoolCapacity returns the capacity of the session pool.
func (*Pool) SessionPoolSize ¶ added in v0.5.0
SessionPoolSize returns the number of sessions in the pool that are idling.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(p *Pool, options ...PublisherOption) *Publisher
func (*Publisher) Get ¶ added in v0.2.0
func (p *Publisher) Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)
Get is only supposed to be used for testing, do not use get for polling any broker queues.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error
Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
func (*Publisher) PublishBatch ¶ added in v0.11.2
func (p *Publisher) PublishBatch(ctx context.Context, exchange string, routingKey string, msgs []types.Publishing) error
PublishBatch a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
type PublisherOption ¶
type PublisherOption func(*publisherOption)
func PublisherWithAutoClosePool ¶
func PublisherWithAutoClosePool(autoClose bool) PublisherOption
func PublisherWithBackoffPolicy ¶ added in v0.8.1
func PublisherWithBackoffPolicy(backoffFunc types.BackoffFunc) PublisherOption
func PublisherWithContext ¶
func PublisherWithContext(ctx context.Context) PublisherOption
func PublisherWithLogger ¶ added in v0.2.0
func PublisherWithLogger(logger *slog.Logger) PublisherOption
type QueueConfig ¶ added in v0.7.1
type QueueConfig struct {
Queue string
types.ConsumeOptions
}
QueueConfig is a read only snapshot of the current handler's queue configuration. It is the common configuration between the handler and the batch handler.
type SessionPool ¶
type SessionPool struct {
RecoverCallback types.SessionRetryCallback
PublishRetryCallback types.SessionRetryCallback
GetRetryCallback types.SessionRetryCallback
ConsumeContextRetryCallback types.SessionRetryCallback
ExchangeDeclareRetryCallback types.SessionRetryCallback
ExchangeDeclarePassiveRetryCallback types.SessionRetryCallback
ExchangeDeleteRetryCallback types.SessionRetryCallback
QueueDeclareRetryCallback types.SessionRetryCallback
QueueDeclarePassiveRetryCallback types.SessionRetryCallback
QueueDeleteRetryCallback types.SessionRetryCallback
QueueBindRetryCallback types.SessionRetryCallback
QueueUnbindRetryCallback types.SessionRetryCallback
QueuePurgeRetryCallback types.SessionRetryCallback
ExchangeBindRetryCallback types.SessionRetryCallback
ExchangeUnbindRetryCallback types.SessionRetryCallback
QoSRetryCallback types.SessionRetryCallback
FlowRetryCallback types.SessionRetryCallback
// contains filtered or unexported fields
}
func NewSessionPool ¶
func NewSessionPool(pool *ConnectionPool, numSessions int, options ...SessionPoolOption) (*SessionPool, error)
func (*SessionPool) Capacity ¶ added in v0.8.0
func (sp *SessionPool) Capacity() int
Capacity returns the size of the session pool which indicate t he number of available cached sessions.
func (*SessionPool) Close ¶
func (sp *SessionPool) Close()
Closes the session pool with all of its sessions
func (*SessionPool) ForceGetSession ¶ added in v0.10.1
func (*SessionPool) GetSession ¶
GetSession gets a pooled session. blocks until a session is acquired from the pool.
func (*SessionPool) GetTransientSession ¶
GetTransientSession returns a transient session. This method may return an error when the context has been closed before a session could be obtained. A transient session creates a transient connection under the hood.
func (*SessionPool) ReturnSession ¶
func (sp *SessionPool) ReturnSession(session *types.Session, err error)
ReturnSession returns a Session to the pool. If Session is not a cached channel, it is simply closed here.
func (*SessionPool) Size ¶
func (sp *SessionPool) Size() int
Size returns the number of available idle sessions in the pool.
type SessionPoolOption ¶
type SessionPoolOption func(*sessionPoolOption)
func SessionPoolWithAutoCloseConnectionPool ¶ added in v0.2.0
func SessionPoolWithAutoCloseConnectionPool(autoClose bool) SessionPoolOption
SessionPoolWithAutoCloseConnectionPool allows to close the internal connection pool automatically. This is helpful in case you have a session pool that is the onl yuser of the connection pool. You are basically passing ownership of the connection pool to the session pool with this.
func SessionPoolWithBufferCapacity ¶ added in v0.8.0
func SessionPoolWithBufferCapacity(capacity int) SessionPoolOption
SessionPoolWithBufferCapacity allows to configure the size of the confirmation, error & blocker buffers of all sessions
func SessionPoolWithConfirms ¶
func SessionPoolWithConfirms(requirePublishConfirms bool) SessionPoolOption
SessionPoolWithConfirms requires all messages from sessions to be acked.
func SessionPoolWithConsumeContextRetryCallback ¶ added in v0.8.0
func SessionPoolWithConsumeContextRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithConsumeContextRetryCallback allows to set a custom consume context retry callback for the session pool.
func SessionPoolWithExchangeBindRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeBindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeBindRetryCallback allows to set a custom exchange bind retry callback for the session pool.
func SessionPoolWithExchangeDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeclarePassiveRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeclarePassiveRetryCallback allows to set a custom exchange declare passive retry callback for the session pool.
func SessionPoolWithExchangeDeclareRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeclareRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeclareRetryCallback allows to set a custom exchange declare retry callback for the session pool.
func SessionPoolWithExchangeDeleteRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeleteRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeleteRetryCallback allows to set a custom exchange delete retry callback for the session pool.
func SessionPoolWithExchangeUnbindRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeUnbindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeUnbindRetryCallback allows to set a custom exchange unbind retry callback for the session pool.
func SessionPoolWithFlowRetryCallback ¶ added in v0.8.0
func SessionPoolWithFlowRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithFlowRetryCallback allows to set a custom flow retry callback for the session pool.
func SessionPoolWithGetRetryCallback ¶ added in v0.8.0
func SessionPoolWithGetRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithGetRetryCallback allows to set a custom get retry callback for the session pool.
func SessionPoolWithLogger ¶
func SessionPoolWithLogger(logger *slog.Logger) SessionPoolOption
SessionPoolWithLogger allows to set a custom logger
func SessionPoolWithPublishRetryCallback ¶ added in v0.8.0
func SessionPoolWithPublishRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithPublishRetryCallback allows to set a custom publish retry callback for the session pool.
func SessionPoolWithQoSRetryCallback ¶ added in v0.8.0
func SessionPoolWithQoSRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQoSRetryCallback allows to set a custom qos retry callback for the session pool.
func SessionPoolWithQueueBindRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueBindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueBindRetryCallback allows to set a custom queue bind retry callback for the session pool.
func SessionPoolWithQueueDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeclarePassiveRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeclarePassiveRetryCallback allows to set a custom queue declare passive retry callback for the session pool.
func SessionPoolWithQueueDeclareRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeclareRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeclareRetryCallback allows to set a custom queue declare retry callback for the session pool.
func SessionPoolWithQueueDeleteRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeleteRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeleteRetryCallback allows to set a custom queue delete retry callback for the session pool.
func SessionPoolWithQueuePurgeRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueuePurgeRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueuePurgeRetryCallback allows to set a custom queue purge retry callback for the session pool.
func SessionPoolWithQueueUnbindRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueUnbindRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueUnbindRetryCallback allows to set a custom queue unbind retry callback for the session pool.
func SessionPoolWithRecoverCallback ¶ added in v0.8.0
func SessionPoolWithRecoverCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithRecoverCallback allows to set a custom recover callback for the session pool.
func SessionPoolWithRetryCallback ¶ added in v0.8.0
func SessionPoolWithRetryCallback(callback types.SessionRetryCallback) SessionPoolOption
SessionPoolWithRetryCallback allows to set a custom retry callback for the session pool. This will set the same retry callback for all operations.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(p *Pool, options ...SubscriberOption) *Subscriber
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
func (*Subscriber) RegisterBatchHandler ¶ added in v0.6.0
func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler)
RegisterBatchHandler registers a custom handler that MIGHT not be closed in case that the subscriber is closed. The passed batch handler may be derived from a different parent context.
func (*Subscriber) RegisterBatchHandlerFunc ¶ added in v0.6.0
func (s *Subscriber) RegisterBatchHandlerFunc(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
RegisterBatchHandlerFunc registers a function that is able to process up to `maxBatchSize` messages at the same time. The flushTimeout is the duration to wait before triggering the processing of the messages. In case your maxBatchSize is 50 and there are only 20 messages in a queue which you can fetch and then you'd have to wait indefinitly for those 20 messages to be processed, as it might take a long time for another message to arrive in the queue. This is where your flushTimeout comes into play. In order to wait at most for the period of flushTimeout until a new message arrives before processing the batch in your handler function.
func (*Subscriber) RegisterHandler ¶
func (s *Subscriber) RegisterHandler(handler *Handler)
func (*Subscriber) RegisterHandlerFunc ¶ added in v0.2.0
func (s *Subscriber) RegisterHandlerFunc(queue string, hf HandlerFunc, options ...types.ConsumeOptions) *Handler
RegisterHandlerFunc registers a consumer function that starts a consumer upon subscriber startup. The consumer is identified by a string that is unique and scoped for all consumers on this channel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ. It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed. Optional arguments can be provided that have specific semantics for the queue or server.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan. When the Channel or Connection is closed, all buffered and inflight messages will be dropped. When the consumer identifier tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
func (*Subscriber) Start ¶
func (s *Subscriber) Start(ctx context.Context) (err error)
Start starts the consumers for all registered handler functions This method is not blocking. Use Wait() to wait for all routines to shut down via context cancelation (e.g. via a signal)
func (*Subscriber) Wait ¶
func (s *Subscriber) Wait()
Wait waits until all consumers have been closed. The provided context must have been closed in order for Wait to unlock after all consumer goroutines of the subscriber have been closed.
type SubscriberOption ¶
type SubscriberOption func(*subscriberOption)
func SubscriberWithAutoClosePool ¶
func SubscriberWithAutoClosePool(autoClose bool) SubscriberOption
func SubscriberWithContext ¶
func SubscriberWithContext(ctx context.Context) SubscriberOption
func SubscriberWithLogger ¶ added in v0.2.0
func SubscriberWithLogger(logger *slog.Logger) SubscriberOption
type Topologer ¶ added in v0.2.0
type Topologer struct {
// contains filtered or unexported fields
}
func NewTopologer ¶ added in v0.2.0
func NewTopologer(p *Pool, options ...TopologerOption) *Topologer
func (*Topologer) ExchangeBind ¶ added in v0.2.0
func (t *Topologer) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, option ...types.ExchangeBindOptions) (err error)
ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.
Binding two exchanges with identical arguments will not create duplicate bindings.
Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil)
ExchangeBind("buy", "AAPL", "trade", false, nil)
Delivery Source Key Destination
example exchange exchange
-----------------------------------------------
key: AAPL --> trade ----> MSFT sell
\---> AAPL --> buy
func (*Topologer) ExchangeDeclare ¶ added in v0.2.0
func (t *Topologer) ExchangeDeclare(ctx context.Context, name string, kind types.ExchangeKind, option ...types.ExchangeDeclareOptions) (err error)
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
Errors returned from this method will close the session. Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.
Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".
func (*Topologer) ExchangeDeclarePassive ¶ added in v0.7.0
func (t *Topologer) ExchangeDeclarePassive(ctx context.Context, name string, kind types.ExchangeKind, option ...types.ExchangeDeclareOptions) (err error)
ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange.
func (*Topologer) ExchangeDelete ¶ added in v0.2.0
func (t *Topologer) ExchangeDelete(ctx context.Context, name string, option ...types.ExchangeDeleteOptions) (err error)
ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.
func (*Topologer) ExchangeUnbind ¶ added in v0.2.0
func (t *Topologer) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, option ...types.ExchangeUnbindOptions) (err error)
ExchangeUnbind unbinds the destination exchange from the source exchange on the server by removing the routing key between them. This is the inverse of ExchangeBind. If the binding does not currently exist, an error will be returned.
func (*Topologer) QueueBind ¶ added in v0.2.0
func (t *Topologer) QueueBind(ctx context.Context, name string, routingKey string, exchange string, option ...types.QueueBindOptions) (err error)
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> log ----> alert --> pagers
key: info ---> log ----> info ---> emails
key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.
In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> amq.topic ----> alert --> pagers
key: info ---> amq.topic ----> # ------> emails
\---> info ---/
key: debug --> amq.topic ----> # ------> emails
func (*Topologer) QueueDeclare ¶ added in v0.2.0
func (t *Topologer) QueueDeclare(ctx context.Context, name string, option ...types.QueueDeclareOptions) (queue types.Queue, err error)
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
Delivery Exchange Key Queue
-----------------------------------------------
key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.
func (*Topologer) QueueDeclarePassive ¶ added in v0.7.0
func (t *Topologer) QueueDeclarePassive(ctx context.Context, name string, option ...types.QueueDeclareOptions) (queue types.Queue, err error)
QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue.
func (*Topologer) QueueDelete ¶ added in v0.2.0
func (t *Topologer) QueueDelete(ctx context.Context, name string, option ...types.QueueDeleteOptions) (purged int, err error)
QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.
func (*Topologer) QueuePurge ¶ added in v0.7.0
func (t *Topologer) QueuePurge(ctx context.Context, name string, options ...types.QueuePurgeOptions) (int, error)
QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed. When successful, returns the number of messages purged.
func (*Topologer) QueueUnbind ¶ added in v0.2.0
func (t *Topologer) QueueUnbind(ctx context.Context, name string, routingKey string, exchange string, args ...types.Table) (err error)
It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
type TopologerOption ¶ added in v0.2.0
type TopologerOption func(*topologerOption)
func TopologerWithContext ¶ added in v0.7.0
func TopologerWithContext(ctx context.Context) TopologerOption
func TopologerWithLogger ¶ added in v0.2.0
func TopologerWithLogger(logger *slog.Logger) TopologerOption
func TopologerWithTransientSessions ¶ added in v0.7.0
func TopologerWithTransientSessions(transientOnly bool) TopologerOption
Source Files
¶
- connection_pool.go
- connection_pool_options.go
- errors.go
- helpers_context.go
- pool.go
- pool_options.go
- publisher.go
- publisher_option.go
- session_pool.go
- session_pool_options.go
- subscriber.go
- subscriber_batch_handler.go
- subscriber_handler.go
- subscriber_handler_options.go
- subscriber_options.go
- topologer.go
- topologer_options.go