kafka

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2018 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ByteDecoder

type ByteDecoder struct{}

func (ByteDecoder) Decode

func (d ByteDecoder) Decode(b []byte) (interface{}, error)

type ByteEncoder

type ByteEncoder struct{}

func (ByteEncoder) Encode

func (e ByteEncoder) Encode(v interface{}) ([]byte, error)

type Decoder

type Decoder interface {
	Decode([]byte) (interface{}, error)
}

type Encoder

type Encoder interface {
	Encode(interface{}) ([]byte, error)
}

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func NewSink

func NewSink(c *SinkConfig) (*Sink, error)

func (*Sink) Close

func (p *Sink) Close() error

Close closes the processor.

func (*Sink) Process

func (p *Sink) Process(key, value interface{}) error

Process processes the stream record.

func (*Sink) WithContext

func (p *Sink) WithContext(ctx streams.Context)

WithContext sets the context on the Processor.

type SinkConfig

type SinkConfig struct {
	sarama.Config

	Brokers []string
	Topic   string

	KeyEncoder   Encoder
	ValueEncoder Encoder

	BatchSize int
}

func NewSinkConfig

func NewSinkConfig() *SinkConfig

func (*SinkConfig) Validate

func (c *SinkConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(c *SourceConfig) (*Source, error)

func (*Source) Close

func (s *Source) Close() error

func (*Source) Commit

func (s *Source) Commit() error

func (*Source) Consume

func (s *Source) Consume() (key, value interface{}, err error)

type SourceConfig

type SourceConfig struct {
	sarama.Config

	Brokers []string
	Topic   string
	GroupId string

	KeyDecoder   Decoder
	ValueDecoder Decoder

	BufferSize int
}

func NewSourceConfig

func NewSourceConfig() *SourceConfig

func (*SourceConfig) Validate

func (c *SourceConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type StringDecoder

type StringDecoder struct{}

func (StringDecoder) Decode

func (d StringDecoder) Decode(b []byte) (interface{}, error)

type StringEncoder

type StringEncoder struct{}

func (StringEncoder) Encode

func (e StringEncoder) Encode(v interface{}) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL