Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( OffsetOldest = sarama.OffsetOldest OffsetNewest = sarama.OffsetNewest )
Initial offset type
var InitialOffset = OffsetNewest
InitialOffset allows to configure global initial offset from which to start consuming partitions which doesn't have offsets stored in the kafka_offsets table
var KafkaConfig *sarama.Config
KafkaConfig global per process Sarama config
var Pipes map[string]constructor
Pipes is the list of registered pipes Plugins insert their constructors into this map
Functions ¶
func CacheDestroy ¶
func CacheDestroy()
CacheDestroy releases all resources associated with cached pipes
Types ¶
type Consumer ¶
type Consumer interface {
Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
Consumer consumer interface for the pipe
type Header ¶
type Header struct {
Format string
Filters []string `json:",omitempty"`
Schema []byte `json:",omitempty"`
Delimited bool `json:",omitempty"`
HMAC string `json:"HMAC-SHA256,omitempty"`
IV string `json:"AES256-CFB-IV,omitempty"`
}
Header represent file metadata in the beginning of the file
type KafkaPipe ¶
type KafkaPipe struct {
// contains filtered or unexported fields
}
KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka
- after failure shutdown pipe guarantees to resent last batchSize messages,
meaning batchSize messages may be in flight, reading (batchSize+1)th message automatically acknowledges previous batch.
- producer caches and sends maximum batchSize messages at once
func (*KafkaPipe) Config ¶
func (p *KafkaPipe) Config() *config.PipeConfig
Config returns pipe configuration
func (*KafkaPipe) NewConsumer ¶
NewConsumer registers a new kafka consumer
func (*KafkaPipe) NewProducer ¶
NewProducer registers a new sync producer
type Pipe ¶
type Pipe interface {
NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error
}
Pipe connects named producers and consumers
type Producer ¶
type Producer interface {
Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
Producer producer interface for pipe