Documentation
¶
Index ¶
- Constants
- Variables
- func IsUnprocessableMessageErr(err error) bool
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- func UnprocessableError(err error) error
- type AMQPChanOps
- type AMQPChanSetup
- type AMQPClient
- type AMQPConnectFunc
- type AMQPConsumer
- type AMQPMessage
- type AMQPMessageHandler
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type PublishResult
- type RawStreamOptions
- type SimpleProducer
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ( ErrProducerShuttingDown = errors.New("amqp: producer shutting down") ErrProducerClosed = errors.New("amqp: producer closed") ErrMaxRetriesReached = errors.New("amqp: publish max retries reached") ErrRetryQueueFull = errors.New("amqp: retry queue full") )
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func IsUnprocessableMessageErr ¶ added in v0.5.3
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
func UnprocessableError ¶ added in v0.5.3
If error is not nil, wraps it in an unprocessable message error so the consumer does not requeue the message.
Types ¶
type AMQPChanOps ¶ added in v0.4.12
type AMQPChanSetup ¶ added in v0.4.12
type AMQPChanSetup interface {
ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDelete(name string, ifUnused, noWait bool) error
ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
QueueInspect(name string) (amqp.Queue, error)
QueueUnbind(name, key, exchange string, args amqp.Table) error
Qos(prefetchCount, prefetchSize int, global bool) error
}
type AMQPClient ¶ added in v0.4.12
type AMQPClient interface {
AMQPProducer
AMQPConsumer
}
func NewAMQPClient ¶ added in v0.4.12
func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc
type AMQPConsumer ¶ added in v0.4.12
type AMQPConsumer interface {
Consume(queue string, concurrency int, handler AMQPMessageHandler) error
Shutdown(context.Context) error
}
func NewAMQPConsumer ¶ added in v0.4.12
func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)
type AMQPMessage ¶ added in v0.3.0
type AMQPMessage struct {
// Exchange and Key of message in the AMQP protocol.
Exchange, Key string
// Body is the payload of the message.
Body interface{}
// Persistent means whether this message should be persisted in durable
// storage not to be lost on broker restarts.
Persistent bool
// ResultChan receives the result message from the publish operation. Used
// to guarantee delivery of messages to the broker through confirmation.
ResultChan chan<- PublishResult
// WaitResult simplifies waiting for the result of a publish operation. If
// true, `Publish` will only return after confirmation has been received for
// the specific message. Cannot be specified together with a `ResultChan`.
WaitResult bool
}
type AMQPMessageHandler ¶ added in v0.4.12
AMQPMessageHandler is a function that will be called for each message received.
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer interface {
Publish(ctx context.Context, msg AMQPMessage) error
Shutdown(context.Context) error
}
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)
type ConsumeOptions ¶
type ConsumeOptions struct {
Stream string
*StreamOptions
*stream.ConsumerOptions
// Whether to memorize the message offset in the stream and use it on
// re-connections to continue from the last read message.
MemorizeOffset bool
}
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type PublishResult ¶ added in v0.4.13
type PublishResult struct {
Message AMQPMessage
Error error
}
type RawStreamOptions ¶
type SimpleProducer ¶ added in v0.4.12
type SimpleProducer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)
func NewAMQPQueueProducer ¶ added in v0.0.3
func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)
type StreamConsumer ¶
type StreamConsumer interface {
ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
CheckConnection() error
Close() error
}
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct {
stream.ConsumerContext
*streamAmqp.Message
}
type StreamOptions ¶
type StreamOptions struct {
stream.StreamOptions
Bindings []BindingArgs
}
Click to show internal directories.
Click to hide internal directories.