Versions in this module Expand all Collapse all v1 v1.0.0 Mar 18, 2020 Changes in this version + const InnerJoin + const LeftJoin + type BuilderOption func(*DefaultBuilders) + func WithBackendBuilder(builder backend.Builder) BuilderOption + func WithChangelogBuilder(builder changelog.Builder) BuilderOption + func WithConsumerBuilder(builder consumer.Builder) BuilderOption + func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption + func WithOffsetManager(offsetManager offsets.Manager) BuilderOption + func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption + func WithProducerBuilder(builder producer.Builder) BuilderOption + func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption + func WithStoreBuilder(builder store.Builder) BuilderOption + type DefaultBuilders struct + Backend backend.Builder + Consumer consumer.Builder + KafkaAdmin admin.KafkaAdmin + OffsetManager offsets.Manager + PartitionConsumer consumer.PartitionConsumerBuilder + Producer producer.Builder + StateStore store.StateStoreBuilder + Store store.Builder + type GlobalTable interface + type GlobalTableOffset int64 + const GlobalTableOffsetDefault + const GlobalTableOffsetLatest + type GlobalTableOption func(options *globalTableOptions) + func GlobalTableWithBackendWriter(writer StoreWriter) GlobalTableOption + func GlobalTableWithLogger(logger log.Logger) GlobalTableOption + func GlobalTableWithOffset(offset GlobalTableOffset) GlobalTableOption + type GlobalTableStreamConfig struct + BackendBuilder backend.Builder + ConsumerBuilder consumer.PartitionConsumerBuilder + KafkaAdmin admin.KafkaAdmin + Logger log.Logger + Metrics metrics.Reporter + OffsetManager offsets.Manager + type Instances struct + func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances + func (ins *Instances) Start() (err error) + func (ins *Instances) Stop() + type InstancesOptions func(config *instancesOptions) + func NotifyOnStart(c chan bool) InstancesOptions + func WithReBalanceHandler(h consumer.ReBalanceHandler) InstancesOptions + type KSink struct + Id int32 + KeyEncoder encoding.Encoder + KeyEncoderBuilder encoding.Builder + Producer producer.Producer + ProducerBuilder producer.Builder + Repartitioned bool + TopicPrefix string + ValEncoder encoding.Encoder + ValEncoderBuilder encoding.Builder + func NewKSinkBuilder(name string, id int32, topic topic, keyEncoder encoding.Builder, ...) *KSink + func (*KSink) Next() bool + func (*KSink) Type() topology.Type + func (s *KSink) AddChild(node topology.Node) + func (s *KSink) AddChildBuilder(builder topology.NodeBuilder) + func (s *KSink) Build() (topology.Node, error) + func (s *KSink) ChildBuilders() []topology.NodeBuilder + func (s *KSink) Childs() []topology.Node + func (s *KSink) Close() error + func (s *KSink) ID() int32 + func (s *KSink) Info() map[string]string + func (s *KSink) Name() string + func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error) + func (s *KSink) SinkType() string + type Option func(*kStreamOptions) + func WithConfig(configs StreamConfigs) Option + func WithLogger(logger log.Logger) Option + func WithWorkerPoolOptions(poolConfig *worker_pool.PoolConfig) Option + type Repartition struct + Enable bool + KeyEncoder encoding.Builder + StreamSide Side + Topic RepartitionTopic + ValueEncoder encoding.Builder + func (r Repartition) Validate(s Side) error + type RepartitionOption func(sink *RepartitionOptions) + func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption + func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption + type RepartitionOptions struct + LeftRepartition Repartition + LeftTopic topic + RightRepartition Repartition + RightTopic topic + func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption) + type RepartitionTopic struct + MinInSycReplicas int + Name string + NumOfPartitions int + ReplicationFactor int + Suffix string + type Side int + const LeftSide + const RightSide + type SinkOption func(sink *KSink) + func WithCustomRecord(f func(ctx context.Context, in SinkRecord) (out SinkRecord, err error)) SinkOption + func WithProducer(p producer.Builder) SinkOption + type SinkRecord struct + Headers []*sarama.RecordHeader + Key interface{} + Timestamp time.Time + Value interface{} + type SourceNode struct + Id int32 + func (sn *SourceNode) AddChild(node topology.Node) + func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder) + func (sn *SourceNode) Build() (topology.Node, error) + func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder + func (sn *SourceNode) Childs() []topology.Node + func (sn *SourceNode) Close() + func (sn *SourceNode) Name() string + func (sn *SourceNode) Next() bool + func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error) + func (sn *SourceNode) Type() topology.Type + type StoreWriter func(r *data.Record, store store.Store) error + type Stream interface + Branch func(branches []branch.Details, opts ...Option) []Stream + Filter func(filter processors.FilterFunc) Stream + JoinGlobalTable func(table Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, ...) Stream + JoinKTable func(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream + JoinStream func(stream Stream, valMapper join.ValueMapper, opts ...RepartitionOption) Stream + Process func(processor processors.ProcessFunc) Stream + SelectKey func(selectKeyFunc processors.SelectKeyFunc) Stream + Through func(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream + To func(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) + Transform func(transformer processors.TransFunc) Stream + TransformValue func(valueTransformFunc processors.ValueTransformFunc) Stream + type StreamBuilder struct + func NewStreamBuilder(config *StreamBuilderConfig, options ...BuilderOption) *StreamBuilder + func (b *StreamBuilder) Build(streams ...Stream) error + func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) GlobalTable + func (b *StreamBuilder) StoreRegistry() store.Registry + func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream + type StreamBuilderConfig struct + ApplicationId string + AsyncProcessing bool + BootstrapServers []string + ChangeLog struct{ ... } + Consumer *consumer.Config + ConsumerCount int + DLQ struct{ ... } + DefaultBuilders *DefaultBuilders + Host string + Logger log.Logger + MetricsReporter metrics.Reporter + Producer *producer.Config + Store struct{ ... } + WorkerPool *worker_pool.PoolConfig + func NewStreamBuilderConfig() *StreamBuilderConfig + func (c *StreamBuilderConfig) String(b *StreamBuilder) string + type StreamConfigs map[string]interface + type StreamInstance struct + func (s *StreamInstance) Start(wg *sync.WaitGroup) error + func (s *StreamInstance) Stop()