Documentation
¶
Index ¶
- Constants
- Variables
- func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
- func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
- func TimestampOffset(t time.Time) stream.OffsetSpecification
- type BindingArgs
- type ConsumeOptions
- type Handler
- type Producer
- type RawStreamOptions
- type StreamConsumer
- type StreamMessage
- type StreamOptions
Constants ¶
View Source
const ( PublishChannelSize = 100 RetryMinDelay = 5 * time.Second PublishLogSampleRate = 1 MaxRetries = 3 )
Variables ¶
View Source
var ByteCapacity = stream.ByteCapacity{}
View Source
var OffsetSpec = stream.OffsetSpecification{}
Functions ¶
func NewConsumerOptions ¶
func NewConsumerOptions(name string, offset stream.OffsetSpecification) *stream.ConsumerOptions
func ParseStreamOptions ¶
func ParseStreamOptions(raw RawStreamOptions) (*stream.StreamOptions, error)
func TimestampOffset ¶
func TimestampOffset(t time.Time) stream.OffsetSpecification
Types ¶
type ConsumeOptions ¶
type ConsumeOptions struct {
Stream string
*StreamOptions
*stream.ConsumerOptions
MemorizeOffset bool
}
type Handler ¶
type Handler interface {
HandleMessage(msg StreamMessage)
}
type Producer ¶
type Producer interface {
Publish(ctx context.Context, key string, body interface{}, persistent bool) error
}
type RawStreamOptions ¶
type StreamConsumer ¶
type StreamConsumer interface {
ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error)
Consume(ctx context.Context, opts ConsumeOptions, handler Handler) error
Close() error
}
func NewStreamConsumer ¶
func NewStreamConsumer(streamUri, amqpUri string) (StreamConsumer, error)
type StreamMessage ¶
type StreamMessage struct {
stream.ConsumerContext
*streamAmqp.Message
}
type StreamOptions ¶
type StreamOptions struct {
stream.StreamOptions
Bindings []BindingArgs
}
Click to show internal directories.
Click to hide internal directories.