Documentation
¶
Index ¶
- func HeadersFromContext(ctx context.Context, headers ...kafka.Header) ([]kafka.Header, bool)
- func NewHeadersContext(ctx context.Context, headers ...kafka.Header) context.Context
- func WithSync() queue.CallOptions
- type Conf
- type HeaderKey
- type Headers
- type PushOption
- type Pusher
- type QueueOption
- type Queues
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HeadersFromContext ¶
func NewHeadersContext ¶
func WithSync ¶
func WithSync() queue.CallOptions
Types ¶
type Conf ¶
type Conf struct {
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
}
type PushOption ¶
type PushOption func(options *pushOptions)
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
func WithCompletion ¶ added in v0.0.12
func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) PushOption
type QueueOption ¶
type QueueOption func(*queueOptions)
func WithCommitInterval ¶
func WithCommitInterval(interval time.Duration) QueueOption
func WithMaxWait ¶
func WithMaxWait(wait time.Duration) QueueOption
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
func WithQueueCapacity ¶
func WithQueueCapacity(queueCapacity int) QueueOption
type Queues ¶
type Queues struct {
// contains filtered or unexported fields
}
func MustNewQueue ¶
func MustNewQueue(c Conf, handler queue.Consumer, opts ...QueueOption) *Queues
Click to show internal directories.
Click to hide internal directories.