Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustNewQueue ¶
func MustNewQueue(c KafkaConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
func NewQueue ¶
func NewQueue(c KafkaConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)
Types ¶
type ConsumeHandle ¶
type ConsumeHandler ¶
func WithHandle ¶
func WithHandle(handle ConsumeHandle) ConsumeHandler
type KafkaConf ¶
type KafkaConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
// Maximum amount of time to wait for new data to come when fetching batches
MaxWait int `json:",default=10"`
// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
ReadBatchTimeout int `json:",default=10"`
// Limit of how many attempts will be made before delivering the error.
MaxAttempts int `json:",default=3"`
}
type PushOption ¶
type PushOption func(options *chunkOptions)
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) PushOption
type QueueOption ¶
type QueueOption func(options *queueOptions)
func WithCommitInterval ¶
func WithCommitInterval(interval time.Duration) QueueOption
func WithMaxWait ¶
func WithMaxWait(wait time.Duration) QueueOption
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
func WithQueueCapacity ¶
func WithQueueCapacity(queueCapacity int) QueueOption
Click to show internal directories.
Click to hide internal directories.