streams

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2018 License: MIT Imports: 7 Imported by: 18

README

Streams

Go Report Card Build Status Coverage Status GitHub release GitHub license

Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.

Note: This is currently a work in progress.

Installation

You can install streams using go get

go get github.com/msales/streams

Concepts

Streams breaks processing into the following basic parts.

  • Message is a message in the system, consisting of a key, value and context.

  • Sources reads and handles position from a data source.

  • Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.

  • Pipe gives processors an abstract view of the current state, allowing Messages to flow through the system.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(msg *Message) error

Process processes the stream nodeMessage.

func (*BranchProcessor) WithPipe

func (p *BranchProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type ErrorFunc

type ErrorFunc func(error)

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FilterProcessor) WithPipe

func (p *FilterProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapProcessor

type FlatMapProcessor struct {
	// contains filtered or unexported fields
}

FlatMapProcessor is a processor that maps a stream using a flat mapping function.

func (*FlatMapProcessor) Close

func (p *FlatMapProcessor) Close() error

Close closes the processor.

func (*FlatMapProcessor) Process

func (p *FlatMapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FlatMapProcessor) WithPipe

func (p *FlatMapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapper

type FlatMapper func(*Message) ([]*Message, error)

FlatMapper represents a mapping function that return multiple messages.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*MapProcessor) WithPipe

func (p *MapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Mapper

type Mapper func(*Message) (*Message, error)

Mapper represents a mapping function.

type MergeProcessor

type MergeProcessor struct {
	// contains filtered or unexported fields
}

MergeProcessor is a processor that passes the message on, keeping track of seen metadata.

func (*MergeProcessor) Close

func (p *MergeProcessor) Close() error

Close closes the processor.

func (*MergeProcessor) Process

func (p *MergeProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*MergeProcessor) WithPipe

func (p *MergeProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Message

type Message struct {
	Ctx   context.Context
	Key   interface{}
	Value interface{}
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(k, v interface{}) *Message

func NewMessageWithContext

func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message

func (Message) Empty

func (m Message) Empty() bool

func (*Message) Metadata

func (m *Message) Metadata() map[Source]interface{}

func (*Message) WithMetadata

func (m *Message) WithMetadata(s Source, v interface{}) *Message

type Node

type Node interface {
	Name() string
	WithPipe(Pipe)
	AddChild(n Node)
	Children() []Node
	Process(*Message) ([]NodeMessage, error)
	Close() error
}

type NodeMessage

type NodeMessage struct {
	Node Node
	Msg  *Message
}

type Pipe

type Pipe interface {
	// Queue gets the queued Messages for each Node.
	//
	// This method should not be used by Processors.
	Queue() []NodeMessage
	// Forward queues the data to all processor children in the topology.
	Forward(*Message) error
	// Forward queues the data to the the given processor(s) child in the topology.
	ForwardToChild(*Message, int) error
	// Commit commits the current state in the sources.
	Commit(*Message) error
}

Pipe allows messages to flow through the processors.

type Predicate

type Predicate func(*Message) (bool, error)

Predicate represents a stream filter function.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*PrintProcessor) WithPipe

func (p *PrintProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Processor

type Processor interface {
	// WithPipe sets the pipe on the Processor.
	WithPipe(Pipe)
	// Process processes the stream Message.
	Process(*Message) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(fns []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(fn Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewFlatMapProcessor

func NewFlatMapProcessor(fn FlatMapper) Processor

NewFlatMapProcessor creates a new FlatMapProcessor instance.

func NewMapProcessor

func NewMapProcessor(fn Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewMergeProcessor

func NewMergeProcessor() Processor

NewMergeProcessor creates a new MergeProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

func NewProcessorNode

func NewProcessorNode(name string, p Processor) *ProcessorNode

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

func (*ProcessorNode) Close

func (n *ProcessorNode) Close() error

func (*ProcessorNode) Name

func (n *ProcessorNode) Name() string

func (*ProcessorNode) Process

func (n *ProcessorNode) Process(msg *Message) ([]NodeMessage, error)

func (*ProcessorNode) WithPipe

func (n *ProcessorNode) WithPipe(pipe Pipe)

type ProcessorPipe

type ProcessorPipe struct {
	// contains filtered or unexported fields
}

ProcessorPipe represents the pipe for processors.

func NewProcessorPipe

func NewProcessorPipe(node Node) *ProcessorPipe

NewProcessorPipe create a new ProcessorPipe instance.

func (*ProcessorPipe) Commit

func (p *ProcessorPipe) Commit(msg *Message) error

Commit commits the current state in the sources.

func (*ProcessorPipe) Forward

func (p *ProcessorPipe) Forward(msg *Message) error

Forward queues the data to all processor children in the topology.

func (*ProcessorPipe) ForwardToChild

func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error

Forward queues the data to the the given processor(s) child in the topology.

func (*ProcessorPipe) Queue

func (p *ProcessorPipe) Queue() []NodeMessage

Queue gets the queued Messages for each Node.

Reading the node message queue will reset the queue.

type Source

type Source interface {
	// Consume gets the next Message from the Source.
	Consume() (*Message, error)
	// Commit marks the consumed Message as processed.
	Commit(interface{}) error
	// Close closes the Source.
	Close() error
}

Source represents a stream source.

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

func NewSourceNode

func NewSourceNode(name string) *SourceNode

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

func (*SourceNode) Close

func (n *SourceNode) Close() error

func (*SourceNode) Name

func (n *SourceNode) Name() string

func (*SourceNode) Process

func (n *SourceNode) Process(msg *Message) ([]NodeMessage, error)

func (*SourceNode) WithPipe

func (n *SourceNode) WithPipe(pipe Pipe)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

func (*Stream) FlatMap

func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() *Topology

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

type Task

type Task interface {
	Start() error
	OnError(fn ErrorFunc)
	Close() error
}

func NewTask

func NewTask(topology *Topology) Task

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

func (Topology) Processors

func (t Topology) Processors() []Node

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() *Topology

Directories

Path Synopsis
example
branch command
kafka command
merge command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL