Documentation
¶
Index ¶
- Constants
- func Base642PEM(base64Key string) []byte
- func Base64ToPEM(base64String string, certType string) []byte
- func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
- func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)
- func VerifyCertPemStr(certStr string) bool
- type ConsumeErrorHandler
- type ConsumeHandle
- type ConsumeHandler
- type KqConf
- type KqPusherConf
- type KqSaslCaConf
- type PushOption
- type Pusher
- type QueueOption
Constants ¶
View Source
const ( SASL_SCRAM = 2 SASL_PLAIN = 1 CA_FILE = 1 CA_TEXT = 2 )
Variables ¶
This section is empty.
Functions ¶
func Base642PEM ¶ added in v1.1.82
func Base64ToPEM ¶ added in v1.1.82
func MustNewQueue ¶
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
func NewQueue ¶
func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)
func VerifyCertPemStr ¶ added in v1.1.82
Types ¶
type ConsumeErrorHandler ¶
type ConsumeHandle ¶
type ConsumeHandler ¶
func WithHandle ¶
func WithHandle(handle ConsumeHandle) ConsumeHandler
type KqConf ¶
type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
KqSaslCaConf
}
type KqPusherConf ¶ added in v1.1.82
type KqPusherConf struct {
Brokers []string
Topic string
Compression int8 `json:",options=1|2|3|4,default=2"`
KqSaslCaConf
}
type KqSaslCaConf ¶ added in v1.1.82
type KqSaslCaConf struct {
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
SASL_WAY int8 `json:",options=1|2,default=1"`
CA_WAY int8 `json:",options=1|2,default=1"`
CaFile string `json:",optional"`
CertFile string `json:",optional"`
KeyFile string `json:",optional"`
CaPEM string `json:",optional"`
CertPEM string `json:",optional"`
KeyPEM string `json:",optional"`
}
type PushOption ¶
type PushOption func(options *pushOptions)
func WithAllowAutoTopicCreation ¶
func WithAllowAutoTopicCreation() PushOption
WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
WithChunkSize customizes the Pusher with the given chunk size.
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) PushOption
WithFlushInterval customizes the Pusher with the given flush interval.
type Pusher ¶
type Pusher struct {
// contains filtered or unexported fields
}
func NewConfPusher ¶ added in v1.1.82
func NewConfPusher(c KqPusherConf, opts ...PushOption) *Pusher
NewConfPusher returns a Pusher with the given Kafka addresses and topic.
func NewPusher ¶
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher
NewPusher returns a Pusher with the given Kafka addresses and topic.
type QueueOption ¶
type QueueOption func(*queueOptions)
func WithCommitInterval ¶
func WithCommitInterval(interval time.Duration) QueueOption
func WithErrorHandler ¶
func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption
func WithMaxWait ¶
func WithMaxWait(wait time.Duration) QueueOption
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
func WithQueueCapacity ¶
func WithQueueCapacity(queueCapacity int) QueueOption
Click to show internal directories.
Click to hide internal directories.