Documentation
¶
Index ¶
- type BranchProcessor
- type ErrorFunc
- type FilterProcessor
- type FlatMapProcessor
- type FlatMapper
- type MapProcessor
- type Mapper
- type MergeProcessor
- type Message
- type Node
- type Pipe
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorNode
- type Pump
- type Source
- type SourceNode
- type SourcePump
- type SourcePumps
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type TimedPipe
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg *Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶ added in v0.3.0
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶ added in v0.3.0
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapProcessor ¶ added in v0.4.0
type FlatMapProcessor struct {
// contains filtered or unexported fields
}
FlatMapProcessor is a processor that maps a stream using a flat mapping function.
func (*FlatMapProcessor) Close ¶ added in v0.4.0
func (p *FlatMapProcessor) Close() error
Close closes the processor.
func (*FlatMapProcessor) Process ¶ added in v0.4.0
func (p *FlatMapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FlatMapProcessor) WithPipe ¶ added in v0.4.0
func (p *FlatMapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapper ¶ added in v0.4.0
FlatMapper represents a mapping function that return multiple messages.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶ added in v0.3.0
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that passes the message on, keeping track of seen metadata.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MergeProcessor) WithPipe ¶ added in v0.3.0
func (p *MergeProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶ added in v0.3.0
type Message struct {
Ctx context.Context
Key interface{}
Value interface{}
// contains filtered or unexported fields
}
Message represents data the flows through the stream.
func NewMessage ¶ added in v0.3.0
func NewMessage(k, v interface{}) *Message
NewMessage creates a Message.
func NewMessageWithContext ¶ added in v0.3.0
NewMessageWithContext creates a Message with the given context.
func (*Message) WithMetadata ¶ added in v0.5.0
WithMetadata add metadata to the Message from a Source.
type Node ¶
type Node interface {
// Name gets the node name.
Name() string
// AddChild adds a child node to the node.
AddChild(n Node)
// Children gets the nodes children.
Children() []Node
// Processor gets the nodes processor.
Processor() Processor
}
Node represents a topology node.
type Pipe ¶ added in v0.3.0
type Pipe interface {
// Forward queues the data to all processor children in the topology.
Forward(*Message) error
// Forward queues the data to the the given processor(s) child in the topology.
ForwardToChild(*Message, int) error
// Commit commits the current state in the sources.
Commit(*Message) error
}
Pipe allows messages to flow through the processors.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶ added in v0.3.0
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface {
// WithPipe sets the pipe on the Processor.
WithPipe(Pipe)
// Process processes the stream Message.
Process(*Message) error
// Close closes the processor.
Close() error
}
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewFlatMapProcessor ¶ added in v0.4.0
func NewFlatMapProcessor(fn FlatMapper) Processor
NewFlatMapProcessor creates a new FlatMapProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
ProcessorNode represents the topology node for a processor.
func NewProcessorNode ¶ added in v0.3.0
func NewProcessorNode(name string, p Processor) *ProcessorNode
NewProcessorNode creates a new ProcessorNode.
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
Children gets the nodes children.
func (*ProcessorNode) Name ¶ added in v0.3.0
func (n *ProcessorNode) Name() string
Name gets the node name.
func (*ProcessorNode) Processor ¶ added in v1.1.0
func (n *ProcessorNode) Processor() Processor
Processor gets the nodes processor.
type Pump ¶ added in v1.1.0
type Pump interface {
// Process processes a message in the Pump.
Process(*Message) error
// Close closes the pump.
Close() error
}
Pump represent a Message pump.
type Source ¶
type Source interface {
// Consume gets the next Message from the Source.
Consume() (*Message, error)
// Commit marks the consumed Message as processed.
Commit(interface{}) error
// Close closes the Source.
Close() error
}
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
SourceNode represents a node between the source and the rest of the node tree.
func NewSourceNode ¶ added in v0.3.0
func NewSourceNode(name string) *SourceNode
NewSourceNode create a new SourceNode.
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
Children gets the nodes children.
func (*SourceNode) Name ¶ added in v0.3.0
func (n *SourceNode) Name() string
Name gets the node name.
func (*SourceNode) Processor ¶ added in v1.1.0
func (n *SourceNode) Processor() Processor
Processor gets the nodes processor.
type SourcePump ¶ added in v1.1.0
type SourcePump interface {
// Stop stops the source pump from running.
Stop()
// Close closed the source pump.
Close() error
}
SourcePump represents a Message pump for sources.
func NewSourcePump ¶ added in v1.1.0
func NewSourcePump(name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump
NewSourcePump creates a new SourcePump.
type SourcePumps ¶ added in v1.1.0
type SourcePumps []SourcePump
SourcePumps represents a set of source pumps.
func (SourcePumps) StopAll ¶ added in v1.1.0
func (p SourcePumps) StopAll()
StopAll stops all source pumps.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a stream of data.
func (*Stream) FlatMap ¶ added in v0.4.0
func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
FlatMap runs a flat mapper on the stream.
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder represents a stream builder.
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder.
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
Build builds the stream Topology.
type Task ¶
type Task interface {
// Start starts the streams processors.
Start() error
// OnError sets the error handler.
OnError(fn ErrorFunc)
// Close stops and closes the streams processors.
Close() error
}
Task represents a streams task.
type TimedPipe ¶ added in v1.1.0
type TimedPipe interface {
// Reset resets the accumulative pipe duration.
Reset()
// Duration returns the accumulative pipe duration.
Duration() time.Duration
}
TimedPipe represents a pipe that can accumulate execution time.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology represents the streams topology.
func (Topology) Processors ¶
Processors gets the topology Processors.
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
TopologyBuilder represents a topology builder.
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
NewTopologyBuilder creates a new TopologyBuilder.
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
AddProcessor adds a Processor to the builder, returning the created Node.
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
AddSource adds a Source to the builder, returning the created Node.
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology
Build creates an immutable Topology.