Documentation
¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type AMQPChanOps
- type AMQPChanSetup
- type AMQPClient
- type AMQPConnectFunc
- type AMQPConsumer
- type AMQPMessage
- type AMQPMessageHandler
- type AMQPProducer
- type BindingArgs
- type ConsumeOptions
- type Handler
- type RawStreamOptions
- type SimpleProducer
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 1024 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 0.1 MaxRetries = 3 )
Variables ¶
View Source
var ( ErrProducerShuttingDown = errors.New("amqp: producer shutting down") ErrProducerClosed = errors.New("amqp: producer closed") )
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var ErrConsumerClosed = errors.New("amqp: consumer closed")
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type AMQPChanOps ¶ added in v0.4.12
type AMQPChanSetup ¶ added in v0.4.12
type AMQPChanSetup interface {
ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDelete(name string, ifUnused, noWait bool) error
ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
QueueInspect(name string) (amqp.Queue, error)
QueueUnbind(name, key, exchange string, args amqp.Table) error
}
type AMQPClient ¶ added in v0.4.12
type AMQPClient interface {
AMQPProducer
AMQPConsumer
}
func NewAMQPClient ¶ added in v0.4.12
func NewAMQPClient(uri string, connectFn AMQPConnectFunc) (AMQPClient, error)
type AMQPConnectFunc ¶ added in v0.3.0
type AMQPConnectFunc func(ctx context.Context, uri string, confirms chan amqp.Confirmation, closed chan *amqp.Error) (AMQPChanOps, error)
func NewAMQPConnectFunc ¶ added in v0.3.0
func NewAMQPConnectFunc(setup func(c AMQPChanSetup) error) AMQPConnectFunc
type AMQPConsumer ¶ added in v0.4.12
type AMQPConsumer interface {
Consume(queue string, concurrency int, handler AMQPMessageHandler) error
Shutdown(context.Context) error
}
func NewAMQPConsumer ¶ added in v0.4.12
func NewAMQPConsumer(uri string, connectFn AMQPConnectFunc) (AMQPConsumer, error)
type AMQPMessage ¶ added in v0.3.0
type AMQPMessageHandler ¶ added in v0.4.12
type AMQPProducer ¶ added in v0.3.0
type AMQPProducer interface {
Publish(ctx context.Context, msg AMQPMessage) error
Shutdown(context.Context) error
}
func NewAMQPProducer ¶ added in v0.3.0
func NewAMQPProducer(uri string, connectFn AMQPConnectFunc) (AMQPProducer, error)
type ConsumeOptions ¶
type ConsumeOptions struct {
Stream string
*StreamOptions
*stream.ConsumerOptions
MemorizeOffset bool
}
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type RawStreamOptions ¶
type SimpleProducer ¶ added in v0.4.12
type SimpleProducer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
func NewAMQPExchangeProducer ¶ added in v0.0.3
func NewAMQPExchangeProducer(ctx context.Context, uri, exchange, keyNs string) (SimpleProducer, error)
func NewAMQPQueueProducer ¶ added in v0.0.3
func NewAMQPQueueProducer(ctx context.Context, uri, queue string) (SimpleProducer, error)
type StreamConsumer ¶
type StreamConsumer interface {
ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
CheckConnection() error
Close() error
}
func NewStreamConsumer ¶
func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct {
stream.ConsumerContext
*streamAmqp.Message
}
type StreamOptions ¶
type StreamOptions struct {
stream.StreamOptions
Bindings []BindingArgs
}
Click to show internal directories.
Click to hide internal directories.