Documentation
¶
Index ¶
- type BranchProcessor
- type ErrorFunc
- type FilterProcessor
- type FlatMapProcessor
- type FlatMapper
- type MapProcessor
- type Mapper
- type MergeProcessor
- type Message
- type Node
- type NodeMessage
- type Pipe
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorNode
- type ProcessorPipe
- type Source
- type SourceNode
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg *Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapProcessor ¶
type FlatMapProcessor struct {
// contains filtered or unexported fields
}
FlatMapProcessor is a processor that maps a stream using a flat mapping function.
func (*FlatMapProcessor) Close ¶
func (p *FlatMapProcessor) Close() error
Close closes the processor.
func (*FlatMapProcessor) Process ¶
func (p *FlatMapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FlatMapProcessor) WithPipe ¶
func (p *FlatMapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapper ¶
FlatMapper represents a mapping function that return multiple messages.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that passes the message on, keeping track of seen metadata.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MergeProcessor) WithPipe ¶
func (p *MergeProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶
type Message struct {
Ctx context.Context
Key interface{}
Value interface{}
// contains filtered or unexported fields
}
func NewMessage ¶
func NewMessage(k, v interface{}) *Message
func NewMessageWithContext ¶
func (*Message) WithMetadata ¶
type NodeMessage ¶
type Pipe ¶
type Pipe interface {
// Queue gets the queued Messages for each Node.
//
// This method should not be used by Processors.
Queue() []NodeMessage
// Forward queues the data to all processor children in the topology.
Forward(*Message) error
// Forward queues the data to the the given processor(s) child in the topology.
ForwardToChild(*Message, int) error
// Commit commits the current state in the sources.
Commit(*Message) error
}
Pipe allows messages to flow through the processors.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface {
// WithPipe sets the pipe on the Processor.
WithPipe(Pipe)
// Process processes the stream Message.
Process(*Message) error
// Close closes the processor.
Close() error
}
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewFlatMapProcessor ¶
func NewFlatMapProcessor(fn FlatMapper) Processor
NewFlatMapProcessor creates a new FlatMapProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
func NewProcessorNode ¶
func NewProcessorNode(name string, p Processor) *ProcessorNode
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
func (*ProcessorNode) Close ¶
func (n *ProcessorNode) Close() error
func (*ProcessorNode) Name ¶
func (n *ProcessorNode) Name() string
func (*ProcessorNode) Process ¶
func (n *ProcessorNode) Process(msg *Message) ([]NodeMessage, error)
func (*ProcessorNode) WithPipe ¶
func (n *ProcessorNode) WithPipe(pipe Pipe)
type ProcessorPipe ¶
type ProcessorPipe struct {
// contains filtered or unexported fields
}
ProcessorPipe represents the pipe for processors.
func NewProcessorPipe ¶
func NewProcessorPipe(node Node) *ProcessorPipe
NewProcessorPipe create a new ProcessorPipe instance.
func (*ProcessorPipe) Commit ¶
func (p *ProcessorPipe) Commit(msg *Message) error
Commit commits the current state in the sources.
func (*ProcessorPipe) Forward ¶
func (p *ProcessorPipe) Forward(msg *Message) error
Forward queues the data to all processor children in the topology.
func (*ProcessorPipe) ForwardToChild ¶
func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error
Forward queues the data to the the given processor(s) child in the topology.
func (*ProcessorPipe) Queue ¶
func (p *ProcessorPipe) Queue() []NodeMessage
Queue gets the queued Messages for each Node.
Reading the node message queue will reset the queue.
type Source ¶
type Source interface {
// Consume gets the next Message from the Source.
Consume() (*Message, error)
// Commit marks the consumed Message as processed.
Commit(interface{}) error
// Close closes the Source.
Close() error
}
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
func NewSourceNode ¶
func NewSourceNode(name string) *SourceNode
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
func (*SourceNode) Close ¶
func (n *SourceNode) Close() error
func (*SourceNode) Name ¶
func (n *SourceNode) Name() string
func (*SourceNode) Process ¶
func (n *SourceNode) Process(msg *Message) ([]NodeMessage, error)
func (*SourceNode) WithPipe ¶
func (n *SourceNode) WithPipe(pipe Pipe)
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
func (Topology) Processors ¶
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology