Documentation
¶
Index ¶
- type Client
- type Config
- type Consumer
- func (c Consumer) Close()
- func (c Consumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (c Consumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (c Consumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, ...) error
- type ConsumerIface
- type MessageHandler
- type MessageUnmarshaler
- type MockKafkaConsumer
- func (m *MockKafkaConsumer) Close()
- func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
- type PartitionOffsets
- type Producer
- type SchemaRegistryConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
Config
// contains filtered or unexported fields
}
Client wraps a sarama client and Kafka configuration and can be used to create producers and consumers
func (Client) NewConsumer ¶
NewConsumer sets up a Kafka consumer
func (Client) NewProducer ¶
NewProducer creates a sarama producer from a client
type Config ¶
type Config struct {
Broker string
ClientID string
TLSCaCrtPath string
TLSCrtPath string
TLSKeyPath string
Handlers map[string]MessageHandler
JSONEnabled bool
Verbose bool
KafkaVersion string
ProducerCompressionCodec string
ProducerCompressionLevel int
SchemaRegistry *SchemaRegistryConfig
// contains filtered or unexported fields
}
Config contains connection settings and configuration for communicating with a Kafka cluster
func (Config) NewClient ¶
NewClient creates a Kafka client with metrics exporting and optional TLS that can be used to create consumers or producers
func (*Config) RegisterFlags ¶
RegisterFlags registers Kafka flags with pflags
type Consumer ¶
type Consumer struct {
Client
// contains filtered or unexported fields
}
Consumer contains a sarama client, consumer, and implementation of the MessageUnmarshaler interface
func (Consumer) ConsumeTopic ¶
func (c Consumer) ConsumeTopic( ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever
This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel. If exitAfterCaughtUp is true, the consumer will exit after reading to the latest offset.
func (Consumer) ConsumeTopicFromBeginning ¶
func (c Consumer) ConsumeTopicFromBeginning( ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.
func (Consumer) ConsumeTopicFromLatest ¶
func (c Consumer) ConsumeTopicFromLatest( ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, ) error
ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.
type ConsumerIface ¶
type ConsumerIface interface {
ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error
Close()
}
ConsumerIface is an interface for consuming messages from a Kafka topic
type MessageHandler ¶
type MessageHandler interface {
HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler MessageUnmarshaler) error
}
MessageHandler defines an interface for handling new messages received by the Kafka consumer
type MessageUnmarshaler ¶
type MessageUnmarshaler interface {
UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}
MessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types
type MockKafkaConsumer ¶
MockKafkaConsumer implements KafkaConsumerIface for testing purposes
func (*MockKafkaConsumer) Close ¶
func (m *MockKafkaConsumer) Close()
Close mocks the Kafka consumer Close method
func (*MockKafkaConsumer) ConsumeTopic ¶
func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopic mocks the Kafka consumer ConsumeTopic method
func (*MockKafkaConsumer) ConsumeTopicFromBeginning ¶
func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method
func (*MockKafkaConsumer) ConsumeTopicFromLatest ¶
func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error
ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method
func (*MockKafkaConsumer) EmitReadResult ¶
func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.
type PartitionOffsets ¶
PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition
type Producer ¶
type Producer struct {
Client
// contains filtered or unexported fields
}
Producer contains a sarama client and async producer
func (Producer) RunProducer ¶
func (p Producer) RunProducer(messages <-chan *sarama.ProducerMessage, done chan bool)
RunProducer wraps the sarama AsyncProducer and adds metrics, logging, and a shutdown procedure to the producer. To stop the producer, close the messages channel; when the producer is shutdown a signal will be emitted on the done channel. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct {
SchemaRegistryURL string
// contains filtered or unexported fields
}
SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry
func (*SchemaRegistryConfig) RegisterFlags ¶
func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers Kafka flags with pflags
func (*SchemaRegistryConfig) UnmarshalMessage ¶
func (src *SchemaRegistryConfig) UnmarshalMessage( ctx context.Context, msg *sarama.ConsumerMessage, target interface{}, ) error
UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.