kafka

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2019 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Config
	// contains filtered or unexported fields
}

Client wraps a sarama client and Kafka configuration and can be used to create producers and consumers

func (Client) Close

func (c Client) Close()

Close the underlying Kafka client

func (Client) NewConsumer

func (c Client) NewConsumer() (Consumer, error)

NewConsumer sets up a Kafka consumer

func (Client) NewProducer

func (c Client) NewProducer() (Producer, error)

NewProducer creates a sarama producer from a client

type Config

type Config struct {
	Broker                   string
	ClientID                 string
	TLSCaCrtPath             string
	TLSCrtPath               string
	TLSKeyPath               string
	Handlers                 map[string]MessageHandler
	JSONEnabled              bool
	Verbose                  bool
	KafkaVersion             string
	ProducerCompressionCodec string
	ProducerCompressionLevel int
	SchemaRegistry           *SchemaRegistryConfig
	// contains filtered or unexported fields
}

Config contains connection settings and configuration for communicating with a Kafka cluster

func (Config) NewClient

func (c Config) NewClient(ctx context.Context) (Client, error)

NewClient creates a Kafka client with metrics exporting and optional TLS that can be used to create consumers or producers

func (*Config) RegisterFlags

func (c *Config) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Kafka flags with pflags

type Consumer

type Consumer struct {
	Client
	// contains filtered or unexported fields
}

Consumer contains a sarama client, consumer, and implementation of the MessageUnmarshaler interface

func (Consumer) Close

func (c Consumer) Close()

Close Sarama consumer and client

func (Consumer) ConsumeTopic

func (c Consumer) ConsumeTopic(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	offsets PartitionOffsets,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever

This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel. If exitAfterCaughtUp is true, the consumer will exit after reading to the latest offset.

func (Consumer) ConsumeTopicFromBeginning

func (c Consumer) ConsumeTopicFromBeginning(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.

func (Consumer) ConsumeTopicFromLatest

func (c Consumer) ConsumeTopicFromLatest(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	readResult chan PartitionOffsets,
) error

ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.

type ConsumerIface

type ConsumerIface interface {
	ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error
	Close()
}

ConsumerIface is an interface for consuming messages from a Kafka topic

type MessageHandler

type MessageHandler interface {
	HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler MessageUnmarshaler) error
}

MessageHandler defines an interface for handling new messages received by the Kafka consumer

type MessageUnmarshaler

type MessageUnmarshaler interface {
	UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}

MessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types

type MockKafkaConsumer

type MockKafkaConsumer struct {
	mock.Mock
	sync.Mutex
	// contains filtered or unexported fields
}

MockKafkaConsumer implements KafkaConsumerIface for testing purposes

func (*MockKafkaConsumer) Close

func (m *MockKafkaConsumer) Close()

Close mocks the Kafka consumer Close method

func (*MockKafkaConsumer) ConsumeTopic

func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopic mocks the Kafka consumer ConsumeTopic method

func (*MockKafkaConsumer) ConsumeTopicFromBeginning

func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method

func (*MockKafkaConsumer) ConsumeTopicFromLatest

func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error

ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method

func (*MockKafkaConsumer) EmitReadResult

func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)

EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.

type PartitionOffsets

type PartitionOffsets map[int32]int64

PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition

type Producer

type Producer struct {
	Client
	// contains filtered or unexported fields
}

Producer contains a sarama client and async producer

func (Producer) RunProducer

func (p Producer) RunProducer(messages <-chan *sarama.ProducerMessage, done chan bool)

RunProducer wraps the sarama AsyncProducer and adds metrics, logging, and a shutdown procedure to the producer. To stop the producer, close the messages channel; when the producer is shutdown a signal will be emitted on the done channel. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	SchemaRegistryURL string
	// contains filtered or unexported fields
}

SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry

func (*SchemaRegistryConfig) RegisterFlags

func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Kafka flags with pflags

func (*SchemaRegistryConfig) UnmarshalMessage

func (src *SchemaRegistryConfig) UnmarshalMessage(
	ctx context.Context,
	msg *sarama.ConsumerMessage,
	target interface{},
) error

UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL