Documentation
¶
Overview ¶
Package otkafka contains the opentracing integrated a kafka transport for package Core. The underlying kafka library is kafka-go: https://github.com/segmentio/kafka-go.
Integration ¶
otkafka exports the configuration in this format:
kafka: writer: foo: brokers: - localhost:9092 topic: foo reader: bar: brokers: - localhost:9092 topic: bar groupID: bar-group
For a complete overview of all available options, call the config init command.
To use package otkafka with package core, add:
var c *core.C = core.New() c.Provide(otkafka.Providers())
The reader and writer factories are bundled into that single provider.
Standalone Usage ¶
in some scenarios, the whole go kit family might be overkill. To directly interact with kafka, use the factory to make writers and readers. Those writers/readers are provided by github.com/segmentio/kafka-go.
c.Invoke(func(writer *kafka.Writer) {
writer.WriteMessage(kafka.Message{})
})
Example (Reader) ¶
var config = `
log:
level: none
kafka:
reader:
default:
brokers:
- localhost:9092
topic:
example
writer:
default:
brokers:
- localhost:9092
topic:
example
`
c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(config)), yaml.Parser()))
c.Provide(otkafka.Providers())
c.Invoke(func(writer *kafka.Writer) {
err := writer.WriteMessages(context.Background(), kafka.Message{Value: []byte(`hello`)})
if err != nil {
panic(err)
}
})
c.Invoke(func(reader *kafka.Reader) {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
fmt.Println(string(msg.Value))
})
Output: hello
Index ¶
- func Providers() []interface{}
- func SpanFromMessage(ctx context.Context, tracer opentracing.Tracer, message *kafka.Message) (opentracing.Span, context.Context, error)
- type KafkaLogAdapter
- type ReaderConfig
- type ReaderFactory
- type ReaderInterceptor
- type ReaderMaker
- type Transport
- type Writer
- type WriterConfig
- type WriterFactory
- type WriterInterceptor
- type WriterMaker
- type WriterOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Providers ¶
func Providers() []interface{}
Providers is a set of dependencies including ReaderMaker, WriterMaker and exported configs.
Depends On: ReaderInterceptor `optional:"true"` WriterInterceptor `optional:"true"` contract.ConfigAccessor log.Logger Provide: ReaderFactory WriterFactory ReaderMaker WriterMaker
func SpanFromMessage ¶
func SpanFromMessage(ctx context.Context, tracer opentracing.Tracer, message *kafka.Message) (opentracing.Span, context.Context, error)
SpanFromMessage reads the message
Types ¶
type KafkaLogAdapter ¶
KafkaLogAdapter is an log adapter bridging kitlog and kafka.
func (KafkaLogAdapter) Printf ¶
func (k KafkaLogAdapter) Printf(s string, i ...interface{})
Printf implements kafka log interface.
type ReaderConfig ¶
type ReaderConfig struct {
// The list of broker addresses used to connect to the kafka cluster.
Brokers []string `json:"brokers" yaml:"brokers"`
// GroupID holds the optional consumer group id. If GroupID is specified, then
// Partition should NOT be specified e.g. 0
GroupID string `json:"groupId" yaml:"groupID"`
// The topic to read messages from.
Topic string `json:"topic" yaml:"topic"`
// Partition to read messages from. Either Partition or GroupID may
// be assigned, but not both
Partition int `json:"partition" yaml:"partition"`
// The capacity of the internal message queue, defaults to 100 if none is
// set.
QueueCapacity int `json:"queue_capacity" yaml:"queue_capacity"`
// Min and max number of bytes to fetch from kafka in each request.
MinBytes int `json:"minBytes" yaml:"minBytes"`
MaxBytes int `json:"maxBytes" yaml:"maxBytes"`
// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
MaxWait time.Duration `json:"maxWait" yaml:"maxWait"`
// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration `json:"readLagInterval" yaml:"readLagInterval"`
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
// group heartbeat update.
//
// Default: 3s
//
// Only used when GroupID is set
HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"`
// CommitInterval indicates the interval at which offsets are committed to
// the broker. If 0, commits will be handled synchronously.
//
// Default: 0
//
// Only used when GroupID is set
CommitInterval time.Duration `json:"commitInterval" yaml:"commitInterval"`
// PartitionWatchInterval indicates how often a reader checks for partition changes.
// If a reader sees a partition change (such as a partition add) it will rebalance the group
// picking up new partitions.
//
// Default: 5s
//
// Only used when GroupID is set and WatchPartitionChanges is set.
PartitionWatchInterval time.Duration `json:"partitionWatchInterval" yaml:"partitionWatchInterval"`
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
// polling the brokers and rebalancing if any partition changes happen to the topic.
WatchPartitionChanges bool `json:"watchPartitionChanges" yaml:"watchPartitionChanges"`
// SessionTimeout optionally sets the length of time that may pass without a heartbeat
// before the coordinator considers the consumer dead and initiates a rebalance.
//
// Default: 30s
//
// Only used when GroupID is set
SessionTimeout time.Duration `json:"sessionTimeout" yaml:"sessionTimeout"`
// RebalanceTimeout optionally sets the length of time the coordinator will wait
// for members to join as part of a rebalance. For kafka servers under higher
// load, it may be useful to set this value higher.
//
// Default: 30s
//
// Only used when GroupID is set
RebalanceTimeout time.Duration `json:"rebalanceTimeout" yaml:"rebalanceTimeout"`
// JoinGroupBackoff optionally sets the length of time to wait between re-joining
// the consumer group after an error.
//
// Default: 5s
JoinGroupBackoff time.Duration `json:"joinGroupBackoff" yaml:"joinGroupBackoff"`
// RetentionTime optionally sets the length of time the consumer group will be saved
// by the broker
//
// Default: 24h
//
// Only used when GroupID is set
RetentionTime time.Duration `json:"retentionTime" yaml:"retentionTime"`
// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
//
// Default: FirstOffset
//
// Only used when GroupID is set
StartOffset int64 `json:"startOffset" yaml:"startOffset"`
// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
// polling for new messages
//
// Default: 100ms
ReadBackoffMin time.Duration `json:"readBackoffMin" yaml:"readBackoffMin"`
// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
// polling for new messages
//
// Default: 1s
ReadBackoffMax time.Duration `json:"readBackoffMax" yaml:"readBackoffMax"`
// Limit of how many attempts will be made before delivering the error.
//
// The default is to try 3 times.
MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`
}
ReaderConfig is a configuration object used to create new instances of Reader.
type ReaderFactory ¶
ReaderFactory is a *di.Factory that creates *kafka.Reader.
Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.
type ReaderInterceptor ¶
type ReaderInterceptor func(name string, reader *kafka.ReaderConfig)
ReaderInterceptor is an interceptor that makes last minute change to a *kafka.ReaderConfig during kafka.Reader's creation
type ReaderMaker ¶
ReaderMaker models a ReaderFactory
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is a type which traces the interacting with kafka brokers.
func NewTransport ¶
func NewTransport(underlying kafka.RoundTripper, tracer opentracing.Tracer) *Transport
NewTransport creates a new kafka transport
type Writer ¶
Writer is a decorator around kafka.Writer that provides tracing capabilities.
func Trace ¶
func Trace(writer *kafka.Writer, tracer opentracing.Tracer, opts ...WriterOption) *Writer
Trace takes a kafka.Writer and returns a decorated Writer.
func (*Writer) WriteMessages ¶
WriteMessages writes a batch of messages to the kafka topic configured on this writer. Each message written has been injected tracing headers. The upstream consumer can extract tracing spans from kafka headers, forming a distributed tracing via messaging.
type WriterConfig ¶
type WriterConfig struct {
// The list of brokers used to discover the partitions available on the
// kafka cluster.
//
// This field is required, attempting to create a writer with an empty list
// of brokers will panic.
Brokers []string `json:"brokers" yaml:"brokers"`
// The topic that the writer will produce messages to.
//
// If provided, this will be used to set the topic for all produced messages.
// If not provided, each Message must specify a topic for itself. This must be
// mutually exclusive, otherwise the Writer will return an error.
Topic string `json:"topic" yaml:"topic"`
// Limit on how many attempts will be made to deliver a message.
//
// The default is to try at most 10 times.
MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`
// Limit on how many messages will be buffered before being sent to a
// partition.
//
// The default is to use a target batch size of 100 messages.
BatchSize int `json:"batchSize" yaml:"batchSize"`
// Limit the maximum size of a request in bytes before being sent to
// a partition.
//
// The default is to use a kafka default value of 1048576.
BatchBytes int `json:"batchBytes" yaml:"batchBytes"`
// Time limit on how often incomplete message batches will be flushed to
// kafka.
//
// The default is to flush at least every second.
BatchTimeout time.Duration `json:"batchTimeout" yaml:"batchTimeout"`
// Timeout for read operations performed by the Writer.
//
// Defaults to 10 seconds.
ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout"`
// Timeout for write operation performed by the Writer.
//
// Defaults to 10 seconds.
WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout"`
// DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
// the topic layout. With the change to use a transport to manage connections,
// the responsibility of syncing the cluster layout has been delegated to the
// transport.
RebalanceInterval time.Duration `json:"rebalanceInterval" yaml:"rebalanceInterval"`
// Number of acknowledges from partition replicas required before receiving
// a response to a produce request. The default is -1, which means to wait for
// all replicas, and a value above 0 is required to indicate how many replicas
// should acknowledge a message to be considered successful.
//
// This version of kafka-go (v0.3) does not support 0 required acks, due to
// some internal complexity implementing this with the Kafka protocol. If you
// need that functionality specifically, you'll need to upgrade to v0.4.
RequiredAcks int `json:"requiredAcks" yaml:"requiredAcks"`
// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
Async bool `json:"async" yaml:"async"`
}
WriterConfig is a configuration type used to create new instances of Writer.
type WriterFactory ¶
WriterFactory is a *di.Factory that creates *kafka.Writer.
Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.
type WriterInterceptor ¶
WriterInterceptor is an interceptor that makes last minute change to a *kafka.Writer during its creation
type WriterMaker ¶
WriterMaker models a WriterFactory
type WriterOption ¶
type WriterOption func(writer *Writer)
WriterOption is type that configures the Writer.
func WithLogger ¶
func WithLogger(logger log.Logger) WriterOption
WithLogger is an option that provides logging to writer.