Documentation
¶
Index ¶
- type BranchProcessor
- type ErrorFunc
- type FilterProcessor
- type FlatMapProcessor
- type FlatMapper
- type MapProcessor
- type Mapper
- type Message
- type Node
- type PassThroughProcessor
- type Pipe
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorNode
- type ProcessorPipe
- type Source
- type SourceNode
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg *Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶ added in v0.3.0
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶ added in v0.3.0
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapProcessor ¶ added in v0.4.0
type FlatMapProcessor struct {
// contains filtered or unexported fields
}
FlatMapProcessor is a processor that maps a stream using a flat mapping function.
func (*FlatMapProcessor) Close ¶ added in v0.4.0
func (p *FlatMapProcessor) Close() error
Close closes the processor.
func (*FlatMapProcessor) Process ¶ added in v0.4.0
func (p *FlatMapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FlatMapProcessor) WithPipe ¶ added in v0.4.0
func (p *FlatMapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapper ¶ added in v0.4.0
FlatMapper represents a mapping function that return multiple messages.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶ added in v0.3.0
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶ added in v0.3.0
func NewMessage ¶ added in v0.3.0
func NewMessage(k, v interface{}) *Message
func NewMessageWithContext ¶ added in v0.3.0
type PassThroughProcessor ¶ added in v0.4.0
type PassThroughProcessor struct {
// contains filtered or unexported fields
}
PassThroughProcessor is a processor that passes the message on.
func (*PassThroughProcessor) Close ¶ added in v0.4.0
func (p *PassThroughProcessor) Close() error
Close closes the processor.
func (*PassThroughProcessor) Process ¶ added in v0.4.0
func (p *PassThroughProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PassThroughProcessor) WithPipe ¶ added in v0.4.0
func (p *PassThroughProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶ added in v0.3.0
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface {
// WithPipe sets the pipe on the Processor.
WithPipe(Pipe)
// Process processes the stream Message.
Process(*Message) error
// Close closes the processor.
Close() error
}
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewFlatMapProcessor ¶ added in v0.4.0
func NewFlatMapProcessor(fn FlatMapper) Processor
NewFlatMapProcessor creates a new FlatMapProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewPassThroughProcessor ¶ added in v0.4.0
func NewPassThroughProcessor() Processor
NewPassThroughProcessor creates a new PassThroughProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
func NewProcessorNode ¶ added in v0.3.0
func NewProcessorNode(name string, p Processor) *ProcessorNode
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
func (*ProcessorNode) Close ¶
func (n *ProcessorNode) Close() error
func (*ProcessorNode) Name ¶ added in v0.3.0
func (n *ProcessorNode) Name() string
func (*ProcessorNode) Process ¶
func (n *ProcessorNode) Process(msg *Message) error
func (*ProcessorNode) WithPipe ¶ added in v0.3.0
func (n *ProcessorNode) WithPipe(pipe Pipe)
type ProcessorPipe ¶ added in v0.3.0
type ProcessorPipe struct {
// contains filtered or unexported fields
}
ProcessorPipe represents the pipe for processors.
func NewProcessorPipe ¶ added in v0.3.0
func NewProcessorPipe(t Task) *ProcessorPipe
NewProcessorPipe create a new ProcessorPipe instance.
func (*ProcessorPipe) Commit ¶ added in v0.3.0
func (p *ProcessorPipe) Commit() error
Commit commits the current state in the sources.
func (*ProcessorPipe) Forward ¶ added in v0.3.0
func (p *ProcessorPipe) Forward(msg *Message) error
Forward passes the data to all processor children in the topology.
func (*ProcessorPipe) ForwardToChild ¶ added in v0.3.0
func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error
Forward passes the data to the the given processor(s) child in the topology.
func (*ProcessorPipe) SetNode ¶ added in v0.3.0
func (p *ProcessorPipe) SetNode(n Node)
SetNode sets the topology node that is being processed.
The is only needed by the task and should not be used directly. Doing so can have some unexpected results.
type Source ¶
type Source interface {
// Consume gets the next Message from the Source.
Consume() (*Message, error)
// Commit marks the consumed Message as processed.
Commit() error
// Close closes the Source.
Close() error
}
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
func NewSourceNode ¶ added in v0.3.0
func NewSourceNode(name string) *SourceNode
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
func (*SourceNode) Close ¶
func (n *SourceNode) Close() error
func (*SourceNode) Name ¶ added in v0.3.0
func (n *SourceNode) Name() string
func (*SourceNode) Process ¶
func (n *SourceNode) Process(msg *Message) error
func (*SourceNode) WithPipe ¶ added in v0.3.0
func (n *SourceNode) WithPipe(pipe Pipe)
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
func (Topology) Processors ¶
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology