kitkafka

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package kitkafka provides a kafka transport for go kit.

Introduction

Go kit has some great properties, such as allowing multiple transport to be used simultaneously. Sadly it limits itself to only support RPCs. in real projects with many decoupled component, messaging is an inevitable path we must go down.

Go kit models the RPCs as:

func(context.Context, request interface{}) (response interface{}, err error)

Package kitkafka treat messaging as a special case of RPC, where the response is always ignored. By using the same model, package kitkafka brings all go kit endpoint into the hood.

See examples for go kit project with kafka as transport.

Integration

kitkafka exports the configuration in this format:

kafka:
  writer:
	foo:
	  brokers:
		- localhost:9092
	  topic: foo
  reader:
	bar:
	  brokers:
		- localhost:9092
	  topic: bar
	  groupID: bar-group

For a complete overview of all available options, call the config init command.

To use package kitkafka with package core, add:

var c *core.C = core.New()
c.Provide(kitkafka.Providers())

The reader and writer factories are bundled into that single provider.

Standalone Usage

in some scenarios, the whole go kit family might be overkill. To directly interact with kafka, use the factory to make writers and readers. Those writers/readers are provided by github.com/segmentio/kafka-go.

c.Invoke(func(writer *kafka.Writer) {
	writer.WriteMessage(kafka.Message{})
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeMarshaller

func EncodeMarshaller(ctx context.Context, msg *kafka.Message, request interface{}) error

EncodeMarshaller encodes the user-domain request object into a *kafka.Message. The request object must implement contract.Marshaller. Protobuf objects implemented this interface out of box.

func ErrHandler

func ErrHandler(logger log.Logger) transport.ErrorHandler

ErrHandler is a transport handler that logs the kafka error message at warning level.

func Providers added in v0.2.0

func Providers() []interface{}

Providers is a set of dependencies including ReaderMaker, WriterMaker and exported configs.

Depends On:
	ReaderInterceptor `optional:"true"`
	WriterInterceptor `optional:"true"`
	contract.ConfigAccessor
	log.Logger
Provide:
	ReaderFactory
	WriterFactory
	ReaderMaker
	WriterMaker

Types

type DecodeRequestFunc

type DecodeRequestFunc func(context.Context, *kafka.Message) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from a *kafka.Message. It is designed to be used in Kafka Subscribers.

type DecodeResponseFunc

type DecodeResponseFunc func(context.Context, *kafka.Message) (response interface{}, err error)

DecodeResponseFunc extracts a user-domain response object from an *kafka.Message. It is designed to be used in Kafka Publishers.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, *kafka.Message, interface{}) error

EncodeRequestFunc encodes the passed request object into a *kafka.Message. It is designed to be used in Kafka Publishers.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, *kafka.Message, interface{}) error

EncodeResponseFunc encodes the passed response object to a *kafka.Message. It is designed to be used in Kafka Subscribers.

type HandleFunc

type HandleFunc func(ctx context.Context, msg kafka.Message) error

HandleFunc is a functional Handler.

func (HandleFunc) Handle

func (h HandleFunc) Handle(ctx context.Context, msg kafka.Message) error

Handle deals with the kafka.Message in some way.

type Handler

type Handler interface {
	Handle(ctx context.Context, msg kafka.Message) error
}

Handler is a symmetric interface for both kafka publication and subscription. As a publisher handler, it is responsible to writes the kafka message to kafka brokers. As a subscriber handler, it is responsible to pipe kafka message to endpoints layer. in go kit analog, this is a go kit transport.

type KafkaLogAdapter

type KafkaLogAdapter struct {
	Logging log.Logger
}

KafkaLogAdapter is an log adapter bridging kitlog and kafka.

func (KafkaLogAdapter) Printf

func (k KafkaLogAdapter) Printf(s string, i ...interface{})

Printf implements kafka log interface.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher wraps a kafka client, and provides a method that implements endpoint.Endpoint.

func NewPublisher

func NewPublisher(
	handler Handler,
	enc EncodeRequestFunc,
	options ...PublisherOption,
) *Publisher

NewPublisher constructs a usable Publisher for a single remote method.

func (Publisher) Endpoint

func (p Publisher) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that invokes the remote endpoint.

type PublisherOpt

type PublisherOpt func(config *publisherConfig)

A PublisherOpt is an option that configures publisher.

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption sets an optional parameter for clients.

func PublisherAfter

func PublisherAfter(after ...RequestResponseFunc) PublisherOption

PublisherAfter sets the ClientResponseFuncs applied to the incoming kafka request prior to it being decoded. This is useful for obtaining anything off of the response and adding onto the context prior to decoding.

func PublisherBefore

func PublisherBefore(before ...RequestResponseFunc) PublisherOption

PublisherBefore sets the RequestFuncs that are applied to the outgoing kafka request before it's invoked.

func PublisherTimeout

func PublisherTimeout(timeout time.Duration) PublisherOption

PublisherTimeout sets the available timeout for an kafka request.

type PublisherService

type PublisherService struct {
	// contains filtered or unexported fields
}

PublisherService is a go kit service with one method, publish.

func MakePublisherService

func MakePublisherService(endpoint endpoint.Endpoint, opt ...PublisherOpt) *PublisherService

MakePublisherService returns a *PublisherService that can publish user-domain messages to kafka brokers. in go kit analog, this is a service with one method, publish.

func (PublisherService) Publish

func (p PublisherService) Publish(ctx context.Context, request interface{}) error

Publish sends the request to kafka.

type Reader added in v0.2.0

type Reader interface {
	Close() error
	ReadMessage(ctx context.Context) (kafka.Message, error)
	FetchMessage(ctx context.Context) (kafka.Message, error)
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
}

Reader models a kafka.Reader

type ReaderConfig

type ReaderConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string `json:"brokers" yaml:"brokers"`

	// GroupID holds the optional consumer group id.  If GroupID is specified, then
	// Partition should NOT be specified e.g. 0
	GroupID string `json:"groupId" yaml:"groupID"`

	// The topic to read messages from.
	Topic string `json:"topic" yaml:"topic"`

	// Partition to read messages from.  Either Partition or GroupID may
	// be assigned, but not both
	Partition int `json:"partition" yaml:"partition"`

	// The capacity of the internal message queue, defaults to 100 if none is
	// set.
	QueueCapacity int `json:"queue_capacity" yaml:"queue_capacity"`

	// Min and max number of bytes to fetch from kafka in each request.
	MinBytes int `json:"minBytes" yaml:"minBytes"`
	MaxBytes int `json:"maxBytes" yaml:"maxBytes"`

	// Maximum amount of time to wait for new data to come when fetching batches
	// of messages from kafka.
	MaxWait time.Duration `json:"maxWait" yaml:"maxWait"`

	// ReadLagInterval sets the frequency at which the reader lag is updated.
	// Setting this field to a negative value disables lag reporting.
	ReadLagInterval time.Duration `json:"readLagInterval" yaml:"readLagInterval"`

	// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
	// group heartbeat update.
	//
	// Default: 3s
	//
	// Only used when GroupID is set
	HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"`

	// CommitInterval indicates the interval at which offsets are committed to
	// the broker.  If 0, commits will be handled synchronously.
	//
	// Default: 0
	//
	// Only used when GroupID is set
	CommitInterval time.Duration `json:"commitInterval" yaml:"commitInterval"`

	// PartitionWatchInterval indicates how often a reader checks for partition changes.
	// If a reader sees a partition change (such as a partition add) it will rebalance the group
	// picking up new partitions.
	//
	// Default: 5s
	//
	// Only used when GroupID is set and WatchPartitionChanges is set.
	PartitionWatchInterval time.Duration `json:"partitionWatchInterval" yaml:"partitionWatchInterval"`

	// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
	// polling the brokers and rebalancing if any partition changes happen to the topic.
	WatchPartitionChanges bool `json:"watchPartitionChanges" yaml:"watchPartitionChanges"`

	// SessionTimeout optionally sets the length of time that may pass without a heartbeat
	// before the coordinator considers the consumer dead and initiates a rebalance.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	SessionTimeout time.Duration `json:"sessionTimeout" yaml:"sessionTimeout"`

	// RebalanceTimeout optionally sets the length of time the coordinator will wait
	// for members to join as part of a rebalance.  For kafka servers under higher
	// load, it may be useful to set this value higher.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	RebalanceTimeout time.Duration `json:"rebalanceTimeout" yaml:"rebalanceTimeout"`

	// JoinGroupBackoff optionally sets the length of time to wait between re-joining
	// the consumer group after an error.
	//
	// Default: 5s
	JoinGroupBackoff time.Duration `json:"joinGroupBackoff" yaml:"joinGroupBackoff"`

	// RetentionTime optionally sets the length of time the consumer group will be saved
	// by the broker
	//
	// Default: 24h
	//
	// Only used when GroupID is set
	RetentionTime time.Duration `json:"retentionTime" yaml:"retentionTime"`

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	//
	// Only used when GroupID is set
	StartOffset int64 `json:"startOffset" yaml:"startOffset"`

	// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 100ms
	ReadBackoffMin time.Duration `json:"readBackoffMin" yaml:"readBackoffMin"`

	// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 1s
	ReadBackoffMax time.Duration `json:"readBackoffMax" yaml:"readBackoffMax"`

	// Limit of how many attempts will be made before delivering the error.
	//
	// The default is to try 3 times.
	MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`
}

ReaderConfig is a configuration object used to create new instances of Reader.

type ReaderFactory

type ReaderFactory struct {
	*di.Factory
}

ReaderFactory is a *di.Factory that creates *kafka.Reader.

Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.

func (ReaderFactory) Make

func (k ReaderFactory) Make(name string) (*kafka.Reader, error)

Make returns a *kafka.Reader under the provided configuration entry.

func (ReaderFactory) MakeSubscriberServer

func (k ReaderFactory) MakeSubscriberServer(name string, subscriber Handler, opt ...ReaderOpt) (*SubscriberServer, error)

MakeSubscriberServer creates a *SubscriberServer.

name: the key of the configuration entry.
subscriber: the Handler (go kit transport layer)

type ReaderInterceptor

type ReaderInterceptor func(name string, reader *kafka.ReaderConfig)

ReaderInterceptor is an interceptor that makes last minute change to a *kafka.ReaderConfig during kafka.Reader's creation

type ReaderMaker

type ReaderMaker interface {
	Make(name string) (*kafka.Reader, error)
}

ReaderMaker models a ReaderFactory

type ReaderOpt

type ReaderOpt func(config *subscriberConfig)

ReaderOpt are options that configures the kafka reader.

func WithParallelism

func WithParallelism(parallelism int) ReaderOpt

WithParallelism configures the parallelism of fan out workers.

func WithSyncCommit

func WithSyncCommit() ReaderOpt

WithSyncCommit is an kafka option that when enabled, only commit the message synchronously if no error is returned from the endpoint.

type RequestResponseFunc

type RequestResponseFunc func(context.Context, *kafka.Message) context.Context

RequestResponseFunc may take information from a publisher request and put it into a request context. in Subscribers, RequestResponseFunc are executed prior to invoking the endpoint.

func ContextToKafka

func ContextToKafka(tracer opentracing.Tracer, logger log.Logger) RequestResponseFunc

ContextToKafka returns an kafka RequestResponseFunc that injects an OpenTracing Span found in `ctx` into the http headers. If no such Span can be found, the RequestFunc is a noop.

func KafkaToContext

func KafkaToContext(tracer opentracing.Tracer, operationName string, logger log.Logger) RequestResponseFunc

KafkaToContext returns an http RequestFunc that tries to join with an OpenTracing trace found in `req` and starts a new Span called `operationName` accordingly. If no trace could be found in `req`, the Span will be a trace root. The Span is incorporated in the returned Context and can be retrieved with opentracing.SpanFromContext(ctx).

type Server

type Server interface {
	Serve(ctx context.Context) error
}

Server models a kafka server. It will block until context canceled. Server usually start serving when the application boot up.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber is go kit transport layer that wraps an endpoint. It is a handler for SubscriberServer.

func NewSubscriber

func NewSubscriber(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	options ...SubscriberOption,
) *Subscriber

NewSubscriber constructs a new subscriber, which provides a handler for Kafka messages.

func (Subscriber) Handle

func (s Subscriber) Handle(ctx context.Context, incoming kafka.Message) error

Handle handles kafka messages.

type SubscriberClientMux

type SubscriberClientMux struct {
	// contains filtered or unexported fields
}

SubscriberClientMux is a group of kafka Server. Useful when consuming from multiple topics.

func NewMux

func NewMux(servers ...Server) SubscriberClientMux

NewMux creates a SubscriberClientMux, which is a group of kafka servers.

func (SubscriberClientMux) Serve

Serve calls the Serve method in parallel for each server in the SubscriberClientMux. It blocks until any of the servers returns.

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption sets an optional parameter for subscribers.

func SubscriberAfter

func SubscriberAfter(after ...RequestResponseFunc) SubscriberOption

SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.

func SubscriberBefore

func SubscriberBefore(before ...RequestResponseFunc) SubscriberOption

SubscriberBefore functions are executed on the publisher delivery object before the request is decoded.

func SubscriberErrorHandler

func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption

SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.

func SubscriberErrorLogger

func SubscriberErrorLogger(logger log.Logger) SubscriberOption

SubscriberErrorLogger is used to log non-terminal errors. By default, no errors are logged. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context. Deprecated: Use SubscriberErrorHandler instead.

type SubscriberServer

type SubscriberServer struct {
	// contains filtered or unexported fields
}

SubscriberServer is a kafka server that continuously consumes messages from kafka. It implements Server. The SubscriberServer internally uses a fan out model, where only one goroutine communicate with kafka, but distribute messages to many parallel worker goroutines. However, this means manual offset commit is also impossible, making it not suitable for tasks that demands strict consistency. An option, WithSyncCommit is provided, for such high consistency tasks. in Sync Commit mode, Server synchronously commit offset to kafka when the error returned by the Handler is Nil.

func (*SubscriberServer) Serve

func (s *SubscriberServer) Serve(ctx context.Context) error

Serve starts the Server. *SubscriberServer will connect to kafka immediately and continuously consuming messages from it. Note Serve uses consumer groups, so Serve can be called on multiple node for the same topic without manually balancing partitions.

type WriterConfig

type WriterConfig struct {
	// The list of brokers used to discover the partitions available on the
	// kafka cluster.
	//
	// This field is required, attempting to create a writer with an empty list
	// of brokers will panic.
	Brokers []string `json:"brokers" yaml:"brokers"`

	// The topic that the writer will produce messages to.
	//
	// If provided, this will be used to set the topic for all produced messages.
	// If not provided, each Message must specify a topic for itself. This must be
	// mutually exclusive, otherwise the Writer will return an error.
	Topic string `json:"topic" yaml:"topic"`

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int `json:"batchSize" yaml:"batchSize"`

	// Limit the maximum size of a request in bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int `json:"batchBytes" yaml:"batchBytes"`

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration `json:"batchTimeout" yaml:"batchTimeout"`

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout"`

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout"`

	// DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
	// the topic layout. With the change to use a transport to manage connections,
	// the responsibility of syncing the cluster layout has been delegated to the
	// transport.
	RebalanceInterval time.Duration `json:"rebalanceInterval" yaml:"rebalanceInterval"`

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAcks int `json:"requiredAcks" yaml:"requiredAcks"`

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool `json:"async" yaml:"async"`
}

WriterConfig is a configuration type used to create new instances of Writer.

type WriterFactory

type WriterFactory struct {
	*di.Factory
}

WriterFactory is a *di.Factory that creates *kafka.Writer.

Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.

func (WriterFactory) Make

func (k WriterFactory) Make(name string) (*kafka.Writer, error)

Make returns a *kafka.Writer under the provided configuration entry.

func (WriterFactory) MakeClient

func (k WriterFactory) MakeClient(name string) (*writerHandle, error)

MakeClient creates an Handler. This handler can write *kafka.Message to kafka broker. The Handler is mean to be consumed by NewPublisher.

type WriterInterceptor

type WriterInterceptor func(name string, writer *kafka.Writer)

WriterInterceptor is an interceptor that makes last minute change to a *kafka.Writer during its creation

type WriterMaker

type WriterMaker interface {
	Make(name string) (*kafka.Writer, error)
}

WriterMaker models a WriterFactory

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL