Documentation
¶
Index ¶
- Constants
- type BuilderOption
- func WithBackendBuilder(builder backend.Builder) BuilderOption
- func WithChangelogBuilder(builder changelog.Builder) BuilderOption
- func WithConsumerBuilder(builder consumer.Builder) BuilderOption
- func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption
- func WithOffsetManager(offsetManager offsets.Manager) BuilderOption
- func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption
- func WithProducerBuilder(builder producer.Builder) BuilderOption
- func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption
- func WithStoreBuilder(builder store.Builder) BuilderOption
- type DefaultBuilders
- type GlobalTable
- type GlobalTableOffset
- type GlobalTableOption
- type GlobalTableStreamConfig
- type Instances
- type InstancesOptions
- type KSink
- func (s *KSink) AddChild(node topology.Node)
- func (s *KSink) AddChildBuilder(builder topology.NodeBuilder)
- func (s *KSink) Build() (topology.Node, error)
- func (s *KSink) ChildBuilders() []topology.NodeBuilder
- func (s *KSink) Childs() []topology.Node
- func (s *KSink) Close() error
- func (s *KSink) ID() int32
- func (s *KSink) Info() map[string]string
- func (s *KSink) Name() string
- func (*KSink) Next() bool
- func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (s *KSink) SinkType() string
- func (*KSink) Type() topology.Type
- type Option
- type Repartition
- type RepartitionOption
- type RepartitionOptions
- type RepartitionTopic
- type Side
- type SinkOption
- func SinkWithProducer(p producer.Builder) SinkOption
- func SinkWithRecordHeaderExtractor(...) SinkOption
- func SinkWithTombstoneFilter(f func(ctx context.Context, in SinkRecord) (tombstone bool)) SinkOption
- func WithCustomRecord(f func(ctx context.Context, in SinkRecord) (out SinkRecord, err error)) SinkOptiondeprecated
- func WithProducer(p producer.Builder) SinkOptiondeprecated
- type SinkRecord
- type SourceNode
- func (sn *SourceNode) AddChild(node topology.Node)
- func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder)
- func (sn *SourceNode) Build() (topology.Node, error)
- func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder
- func (sn *SourceNode) Childs() []topology.Node
- func (sn *SourceNode) Close()
- func (sn *SourceNode) Name() string
- func (sn *SourceNode) Next() bool
- func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sn *SourceNode) Type() topology.Type
- type StoreWriter
- type Stream
- type StreamBuilder
- func (b *StreamBuilder) Build(streams ...Stream) error
- func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) GlobalTable
- func (b *StreamBuilder) StoreRegistry() store.Registry
- func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream
- type StreamBuilderConfig
- type StreamConfigs
- type StreamInstance
Constants ¶
const ( LeftJoin join.Type = iota InnerJoin )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BuilderOption ¶
type BuilderOption func(*DefaultBuilders)
func WithBackendBuilder ¶
func WithBackendBuilder(builder backend.Builder) BuilderOption
func WithChangelogBuilder ¶
func WithChangelogBuilder(builder changelog.Builder) BuilderOption
func WithConsumerBuilder ¶
func WithConsumerBuilder(builder consumer.Builder) BuilderOption
func WithKafkaAdmin ¶
func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption
func WithOffsetManager ¶
func WithOffsetManager(offsetManager offsets.Manager) BuilderOption
func WithPartitionConsumerBuilder ¶
func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption
func WithProducerBuilder ¶
func WithProducerBuilder(builder producer.Builder) BuilderOption
func WithStateStoreBuilder ¶
func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption
func WithStoreBuilder ¶
func WithStoreBuilder(builder store.Builder) BuilderOption
type DefaultBuilders ¶
type DefaultBuilders struct {
Producer producer.Builder
Consumer consumer.Builder
PartitionConsumer consumer.PartitionConsumerBuilder
Store store.Builder
IndexedStore store.IndexedStoreBuilder
Backend backend.Builder
StateStore store.StateStoreBuilder
OffsetManager offsets.Manager
KafkaAdmin admin.KafkaAdmin
// contains filtered or unexported fields
}
type GlobalTable ¶
type GlobalTable interface {
Stream
}
type GlobalTableOffset ¶
type GlobalTableOffset int64
Starting offset for the global table partition.
const GlobalTableOffsetDefault GlobalTableOffset = 0
GlobalTableOffsetDefault defines the starting offset for the GlobalTable when GlobalTable stream syncing started.
const GlobalTableOffsetLatest GlobalTableOffset = -1
GlobalTableOffsetLatest defines the beginning of the partition. Suitable for topics with retention policy delete since the topic can contains historical data.
type GlobalTableOption ¶
type GlobalTableOption func(options *globalTableOptions)
func GlobalTableWithBackendWriter ¶
func GlobalTableWithBackendWriter(writer StoreWriter) GlobalTableOption
GlobalTableWithBackendWriter overrides the persisting behavior of the GlobalTable. eg :
func(r *data.Record, store store.Store) error {
// tombstone handling
if r.Value == nil {
if err := store.Backend().Delete(r.Key); err != nil {
return err
}
}
return store.Backend().Set(r.Key, r.Value, 0)
}
func GlobalTableWithLogger ¶
func GlobalTableWithLogger(logger log.Logger) GlobalTableOption
GlobalTableWithLogger overrides the default logger for the GlobalTable (default is NoopLogger).
func GlobalTableWithOffset ¶
func GlobalTableWithOffset(offset GlobalTableOffset) GlobalTableOption
GlobalTableWithOffset overrides the default starting offset when GlobalTable syncing started.
type GlobalTableStreamConfig ¶
type Instances ¶
type Instances struct {
// contains filtered or unexported fields
}
func NewStreams ¶
func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances
type InstancesOptions ¶
type InstancesOptions func(config *instancesOptions)
func NotifyOnStart ¶
func NotifyOnStart(c chan bool) InstancesOptions
func WithConsumerOptions ¶ added in v1.2.0
func WithConsumerOptions(opt consumer.Option) InstancesOptions
func WithReBalanceHandler ¶
func WithReBalanceHandler(h consumer.ReBalanceHandler) InstancesOptions
type KSink ¶
type KSink struct {
Id int32
KeyEncoder encoding.Encoder
ValEncoder encoding.Encoder
Producer producer.Producer
ProducerBuilder producer.Builder
TopicPrefix string
Repartitioned bool
KeyEncoderBuilder encoding.Builder
ValEncoderBuilder encoding.Builder
// contains filtered or unexported fields
}
func NewKSinkBuilder ¶
func (*KSink) AddChildBuilder ¶
func (s *KSink) AddChildBuilder(builder topology.NodeBuilder)
func (*KSink) ChildBuilders ¶
func (s *KSink) ChildBuilders() []topology.NodeBuilder
type Option ¶
type Option func(*kStreamOptions)
func WithConfig ¶
func WithConfig(configs StreamConfigs) Option
func WithLogger ¶
func WithWorkerPoolOptions ¶
func WithWorkerPoolOptions(poolConfig *worker_pool.PoolConfig) Option
type Repartition ¶
type Repartition struct {
Enable bool
StreamSide Side
KeyEncoder encoding.Builder
ValueEncoder encoding.Builder
Topic RepartitionTopic
}
func (Repartition) Validate ¶
func (r Repartition) Validate(s Side) error
type RepartitionOption ¶
type RepartitionOption func(sink *RepartitionOptions)
func RepartitionLeftStream ¶
func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
func RepartitionRightStream ¶
func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
type RepartitionOptions ¶
type RepartitionOptions struct {
LeftTopic topic
RightTopic topic
LeftRepartition Repartition
RightRepartition Repartition
}
func (*RepartitionOptions) Apply ¶
func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption)
type RepartitionTopic ¶
type SinkOption ¶
type SinkOption func(sink *KSink)
func SinkWithProducer ¶ added in v1.2.0
func SinkWithProducer(p producer.Builder) SinkOption
func SinkWithRecordHeaderExtractor ¶ added in v1.2.0
func SinkWithRecordHeaderExtractor(f func(ctx context.Context, in SinkRecord) (headers data.RecordHeaders, err error)) SinkOption
func SinkWithTombstoneFilter ¶ added in v1.2.0
func SinkWithTombstoneFilter(f func(ctx context.Context, in SinkRecord) (tombstone bool)) SinkOption
func WithCustomRecord
deprecated
func WithCustomRecord(f func(ctx context.Context, in SinkRecord) (out SinkRecord, err error)) SinkOption
Deprecated: Please use SinkWithRecordHeaderExtractor instead
func WithProducer
deprecated
func WithProducer(p producer.Builder) SinkOption
Deprecated: Please use SinkWithProducer instead
type SinkRecord ¶
type SinkRecord struct {
Key, Value interface{}
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
Headers data.RecordHeaders // only set if kafka is version 0.11+
}
type SourceNode ¶
type SourceNode struct {
Id int32
// contains filtered or unexported fields
}
func (*SourceNode) AddChild ¶
func (sn *SourceNode) AddChild(node topology.Node)
func (*SourceNode) AddChildBuilder ¶
func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder)
func (*SourceNode) ChildBuilders ¶
func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder
func (*SourceNode) Childs ¶
func (sn *SourceNode) Childs() []topology.Node
func (*SourceNode) Close ¶
func (sn *SourceNode) Close()
func (*SourceNode) Name ¶
func (sn *SourceNode) Name() string
func (*SourceNode) Next ¶
func (sn *SourceNode) Next() bool
func (*SourceNode) Run ¶
func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*SourceNode) Type ¶
func (sn *SourceNode) Type() topology.Type
type Stream ¶
type Stream interface {
Branch(branches []branch.Details, opts ...Option) []Stream
SelectKey(selectKeyFunc processors.SelectKeyFunc) Stream
TransformValue(valueTransformFunc processors.ValueTransformFunc) Stream
Transform(transformer processors.TransFunc) Stream
Filter(filter processors.FilterFunc) Stream
Process(processor processors.ProcessFunc) Stream
JoinGlobalTable(table Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, typ join.Type) Stream
JoinKTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
JoinStream(stream Stream, valMapper join.ValueMapper, opts ...RepartitionOption) Stream
//LeftJoin(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
Through(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) Stream
To(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption)
}
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(config *StreamBuilderConfig, options ...BuilderOption) *StreamBuilder
func (*StreamBuilder) Build ¶
func (b *StreamBuilder) Build(streams ...Stream) error
func (*StreamBuilder) GlobalTable ¶
func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, store string, options ...GlobalTableOption) GlobalTable
func (*StreamBuilder) StoreRegistry ¶
func (b *StreamBuilder) StoreRegistry() store.Registry
type StreamBuilderConfig ¶
type StreamBuilderConfig struct {
ApplicationId string
AsyncProcessing bool
BootstrapServers []string // kafka Brokers
WorkerPool *worker_pool.PoolConfig
Store struct {
BackendBuilder backend.Builder
ChangeLog struct {
MinInSycReplicas int // min number of insync replications in other nodes
ReplicationFactor int
Suffix string
Buffered bool
BufferedSize int
}
Http struct {
Enabled bool
Host string
}
}
DLQ struct {
Enabled bool
BootstrapServers []string
TopicFormat string
//Type dlq.DqlType // G, T
Topic string // if global
}
Host string
ChangeLog struct {
Enabled bool
Replicated bool
MinInSycReplicas int // min number of insync replications in other nodes
ReplicationFactor int
Suffix string
Buffer struct {
Enabled bool
Size int
FlushInterval time.Duration
}
}
Consumer *consumer.Config
ConsumerCount int
*sarama.Config
Producer *producer.Config
MetricsReporter metrics.Reporter
Logger log.Logger
DefaultBuilders *DefaultBuilders
}
func NewStreamBuilderConfig ¶
func NewStreamBuilderConfig() *StreamBuilderConfig
func (*StreamBuilderConfig) String ¶
func (c *StreamBuilderConfig) String(b *StreamBuilder) string
type StreamConfigs ¶
type StreamConfigs map[string]interface{}
type StreamInstance ¶
type StreamInstance struct {
// contains filtered or unexported fields
}
func (*StreamInstance) Start ¶
func (s *StreamInstance) Start(wg *sync.WaitGroup) error
starts the high level consumer for all streams
func (*StreamInstance) Stop ¶
func (s *StreamInstance) Stop()