Documentation
¶
Index ¶
- type BranchProcessor
- type ErrorFunc
- type FilterProcessor
- type MapProcessor
- type Mapper
- type MergeProcessor
- type Message
- type Node
- type Pipe
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorNode
- type ProcessorPipe
- type Source
- type SourceNode
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg *Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶ added in v0.3.0
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶ added in v0.3.0
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶ added in v0.3.0
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that merges multiple streams.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MergeProcessor) WithPipe ¶ added in v0.3.0
func (p *MergeProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶ added in v0.3.0
func NewMessage ¶ added in v0.3.0
func NewMessage(k, v interface{}) *Message
func NewMessageWithContext ¶ added in v0.3.0
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶ added in v0.3.0
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface {
// WithPipe sets the pipe on the Processor.
WithPipe(Pipe)
// Process processes the stream Message.
Process(*Message) error
// Close closes the processor.
Close() error
}
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
func NewProcessorNode ¶ added in v0.3.0
func NewProcessorNode(name string, p Processor) *ProcessorNode
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
func (*ProcessorNode) Close ¶
func (n *ProcessorNode) Close() error
func (*ProcessorNode) Name ¶ added in v0.3.0
func (n *ProcessorNode) Name() string
func (*ProcessorNode) Process ¶
func (n *ProcessorNode) Process(msg *Message) error
func (*ProcessorNode) WithPipe ¶ added in v0.3.0
func (n *ProcessorNode) WithPipe(pipe Pipe)
type ProcessorPipe ¶ added in v0.3.0
type ProcessorPipe struct {
// contains filtered or unexported fields
}
ProcessorPipe represents the pipe for processors.
func NewProcessorPipe ¶ added in v0.3.0
func NewProcessorPipe(t Task) *ProcessorPipe
NewProcessorPipe create a new ProcessorPipe instance.
func (*ProcessorPipe) Commit ¶ added in v0.3.0
func (p *ProcessorPipe) Commit() error
Commit commits the current state in the sources.
func (*ProcessorPipe) Forward ¶ added in v0.3.0
func (p *ProcessorPipe) Forward(msg *Message) error
Forward passes the data to all processor children in the topology.
func (*ProcessorPipe) ForwardToChild ¶ added in v0.3.0
func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error
Forward passes the data to the the given processor(s) child in the topology.
func (*ProcessorPipe) SetNode ¶ added in v0.3.0
func (p *ProcessorPipe) SetNode(n Node)
SetNode sets the topology node that is being processed.
The is only needed by the task and should not be used directly. Doing so can have some unexpected results.
type Source ¶
type Source interface {
// Consume gets the next Message from the Source.
Consume() (*Message, error)
// Commit marks the consumed Message as processed.
Commit() error
// Close closes the Source.
Close() error
}
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
func NewSourceNode ¶ added in v0.3.0
func NewSourceNode(name string) *SourceNode
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
func (*SourceNode) Close ¶
func (n *SourceNode) Close() error
func (*SourceNode) Name ¶ added in v0.3.0
func (n *SourceNode) Name() string
func (*SourceNode) Process ¶
func (n *SourceNode) Process(msg *Message) error
func (*SourceNode) WithPipe ¶ added in v0.3.0
func (n *SourceNode) WithPipe(pipe Pipe)
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
func (Topology) Processors ¶
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology