Documentation
¶
Index ¶
- type BranchProcessor
- type Context
- type ErrorFunc
- type FilterProcessor
- type MapProcessor
- type Mapper
- type MergeProcessor
- type Node
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorContext
- type ProcessorNode
- type Source
- type SourceNode
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type TaskFunc
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(key, value interface{}) error
Process processes the stream record.
func (*BranchProcessor) WithContext ¶
func (p *BranchProcessor) WithContext(ctx Context)
WithContext sets the context on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(key, value interface{}) error
Process processes the stream record.
func (*FilterProcessor) WithContext ¶
func (p *FilterProcessor) WithContext(ctx Context)
WithContext sets the context on the Processor.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(key, value interface{}) error
Process processes the stream record.
func (*MapProcessor) WithContext ¶
func (p *MapProcessor) WithContext(ctx Context)
WithContext sets the context on the Processor.
type Mapper ¶
type Mapper func(key, value interface{}) (interface{}, interface{}, error)
Mapper represents a mapping function
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that merges multiple streams.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(key, value interface{}) error
Process processes the stream record.
func (*MergeProcessor) WithContext ¶
func (p *MergeProcessor) WithContext(ctx Context)
WithContext sets the context on the Processor.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(key, value interface{}) error
Process processes the stream record.
func (*PrintProcessor) WithContext ¶
func (p *PrintProcessor) WithContext(ctx Context)
WithContext sets the context on the Processor.
type Processor ¶
type Processor interface {
// WithContext sets the context on the Processor.
WithContext(ctx Context)
// Process processes the stream record.
Process(key, value interface{}) error
// Close closes the processor.
Close() error
}
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorContext ¶
type ProcessorContext struct {
// contains filtered or unexported fields
}
func NewProcessorContext ¶
func (*ProcessorContext) Commit ¶
func (c *ProcessorContext) Commit() error
func (*ProcessorContext) Forward ¶
func (c *ProcessorContext) Forward(key, value interface{}) error
func (*ProcessorContext) ForwardToChild ¶
func (c *ProcessorContext) ForwardToChild(key, value interface{}, index int) error
func (*ProcessorContext) Logger ¶
func (c *ProcessorContext) Logger() log.Logger
func (*ProcessorContext) Stats ¶
func (c *ProcessorContext) Stats() stats.Stats
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
func (*ProcessorNode) Close ¶
func (n *ProcessorNode) Close() error
func (*ProcessorNode) Process ¶
func (n *ProcessorNode) Process(key, value interface{}) error
func (*ProcessorNode) WithContext ¶
func (n *ProcessorNode) WithContext(ctx Context)
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
func (*SourceNode) Close ¶
func (n *SourceNode) Close() error
func (*SourceNode) Process ¶
func (n *SourceNode) Process(key, value interface{}) error
func (*SourceNode) WithContext ¶
func (n *SourceNode) WithContext(ctx Context)
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
func (Topology) Processors ¶
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology