streams

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2018 License: MIT Imports: 6 Imported by: 0

README

Streams

Go Report Card Build Status Coverage Status GitHub release GitHub license

Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.

Note: This is currently a work in progress.

Installation

You can install streams using go get

go get github.com/msales/streams

Concepts

Streams breaks processing into the following basic parts.

  • Message is a message in the system, consisting of a key, value and context.

  • Sources reads and handles position from a data source.

  • Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.

  • Pipe gives processors an abstract view of the current state, allowing Messages to flow through the system.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(msg *Message) error

Process processes the stream nodeMessage.

func (*BranchProcessor) WithPipe added in v0.3.0

func (p *BranchProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type ErrorFunc

type ErrorFunc func(error)

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FilterProcessor) WithPipe added in v0.3.0

func (p *FilterProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapProcessor added in v0.4.0

type FlatMapProcessor struct {
	// contains filtered or unexported fields
}

FlatMapProcessor is a processor that maps a stream using a flat mapping function.

func (*FlatMapProcessor) Close added in v0.4.0

func (p *FlatMapProcessor) Close() error

Close closes the processor.

func (*FlatMapProcessor) Process added in v0.4.0

func (p *FlatMapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FlatMapProcessor) WithPipe added in v0.4.0

func (p *FlatMapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapper added in v0.4.0

type FlatMapper func(*Message) ([]*Message, error)

FlatMapper represents a mapping function that return multiple messages.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*MapProcessor) WithPipe added in v0.3.0

func (p *MapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Mapper

type Mapper func(*Message) (*Message, error)

Mapper represents a mapping function.

type Message added in v0.3.0

type Message struct {
	Ctx   context.Context
	Key   interface{}
	Value interface{}
}

func NewMessage added in v0.3.0

func NewMessage(k, v interface{}) *Message

func NewMessageWithContext added in v0.3.0

func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message

func (Message) Empty added in v0.3.0

func (m Message) Empty() bool

type Node

type Node interface {
	Name() string
	WithPipe(Pipe)
	AddChild(n Node)
	Children() []Node
	Process(*Message) error
	Close() error
}

type PassThroughProcessor added in v0.4.0

type PassThroughProcessor struct {
	// contains filtered or unexported fields
}

PassThroughProcessor is a processor that passes the message on.

func (*PassThroughProcessor) Close added in v0.4.0

func (p *PassThroughProcessor) Close() error

Close closes the processor.

func (*PassThroughProcessor) Process added in v0.4.0

func (p *PassThroughProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*PassThroughProcessor) WithPipe added in v0.4.0

func (p *PassThroughProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Pipe added in v0.3.0

type Pipe interface {
	Forward(*Message) error
	ForwardToChild(*Message, int) error
	Commit() error
}

Pipe allows messages to flow through the processors.

type Predicate

type Predicate func(*Message) (bool, error)

Predicate represents a stream filter function.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*PrintProcessor) WithPipe added in v0.3.0

func (p *PrintProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Processor

type Processor interface {
	// WithPipe sets the pipe on the Processor.
	WithPipe(Pipe)
	// Process processes the stream Message.
	Process(*Message) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(fns []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(fn Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewFlatMapProcessor added in v0.4.0

func NewFlatMapProcessor(fn FlatMapper) Processor

NewFlatMapProcessor creates a new FlatMapProcessor instance.

func NewMapProcessor

func NewMapProcessor(fn Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewPassThroughProcessor added in v0.4.0

func NewPassThroughProcessor() Processor

NewPassThroughProcessor creates a new PassThroughProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

func NewProcessorNode added in v0.3.0

func NewProcessorNode(name string, p Processor) *ProcessorNode

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

func (*ProcessorNode) Close

func (n *ProcessorNode) Close() error

func (*ProcessorNode) Name added in v0.3.0

func (n *ProcessorNode) Name() string

func (*ProcessorNode) Process

func (n *ProcessorNode) Process(msg *Message) error

func (*ProcessorNode) WithPipe added in v0.3.0

func (n *ProcessorNode) WithPipe(pipe Pipe)

type ProcessorPipe added in v0.3.0

type ProcessorPipe struct {
	// contains filtered or unexported fields
}

ProcessorPipe represents the pipe for processors.

func NewProcessorPipe added in v0.3.0

func NewProcessorPipe(t Task) *ProcessorPipe

NewProcessorPipe create a new ProcessorPipe instance.

func (*ProcessorPipe) Commit added in v0.3.0

func (p *ProcessorPipe) Commit() error

Commit commits the current state in the sources.

func (*ProcessorPipe) Forward added in v0.3.0

func (p *ProcessorPipe) Forward(msg *Message) error

Forward passes the data to all processor children in the topology.

func (*ProcessorPipe) ForwardToChild added in v0.3.0

func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error

Forward passes the data to the the given processor(s) child in the topology.

func (*ProcessorPipe) SetNode added in v0.3.0

func (p *ProcessorPipe) SetNode(n Node)

SetNode sets the topology node that is being processed.

The is only needed by the task and should not be used directly. Doing so can have some unexpected results.

type Source

type Source interface {
	// Consume gets the next Message from the Source.
	Consume() (*Message, error)
	// Commit marks the consumed Message as processed.
	Commit() error
	// Close closes the Source.
	Close() error
}

Source represents a stream source.

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

func NewSourceNode added in v0.3.0

func NewSourceNode(name string) *SourceNode

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

func (*SourceNode) Close

func (n *SourceNode) Close() error

func (*SourceNode) Name added in v0.3.0

func (n *SourceNode) Name() string

func (*SourceNode) Process

func (n *SourceNode) Process(msg *Message) error

func (*SourceNode) WithPipe added in v0.3.0

func (n *SourceNode) WithPipe(pipe Pipe)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

func (*Stream) FlatMap added in v0.4.0

func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() *Topology

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

type Task

type Task interface {
	Start()
	Commit() error
	OnError(fn ErrorFunc)
	Close() error
}

func NewTask

func NewTask(topology *Topology) Task

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

func (Topology) Processors

func (t Topology) Processors() []Node

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() *Topology

Directories

Path Synopsis
example
branch command
kafka command
merge command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL