streams

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2018 License: MIT Imports: 6 Imported by: 0

README

Streams

Go Report Card Build Status Coverage Status GitHub release GitHub license

Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.

Note: This is currently a work in progress.

Installation

You can install streams using go get

go get github.com/msales/streams

Concepts

Streams breaks processing into two basic parts.

  • Sources reads and handles position from a data source.

  • Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.

  • Context gives processors an abstract view of the current state.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*BranchProcessor) WithContext

func (p *BranchProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Context

type Context interface {
	Forward(key, value interface{}) error
	ForwardToChild(key, value interface{}, child int) error
	Commit() error

	Logger() log.Logger
	Stats() stats.Stats
}

type ErrorFunc

type ErrorFunc func(error)

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*FilterProcessor) WithContext

func (p *FilterProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*MapProcessor) WithContext

func (p *MapProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Mapper

type Mapper func(key, value interface{}) (interface{}, interface{}, error)

Mapper represents a mapping function

type MergeProcessor

type MergeProcessor struct {
	// contains filtered or unexported fields
}

MergeProcessor is a processor that merges multiple streams.

func (*MergeProcessor) Close

func (p *MergeProcessor) Close() error

Close closes the processor.

func (*MergeProcessor) Process

func (p *MergeProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*MergeProcessor) WithContext

func (p *MergeProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Node

type Node interface {
	WithContext(ctx Context)
	AddChild(n Node)
	Children() []Node
	Process(key, value interface{}) error
	Close() error
}

type Predicate

type Predicate func(k, v interface{}) (bool, error)

Predicate represents a stream filter function.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*PrintProcessor) WithContext

func (p *PrintProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Processor

type Processor interface {
	// WithContext sets the context on the Processor.
	WithContext(ctx Context)
	// Process processes the stream record.
	Process(key, value interface{}) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(fns []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(fn Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewMapProcessor

func NewMapProcessor(fn Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewMergeProcessor

func NewMergeProcessor() Processor

NewMergeProcessor creates a new MergeProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorContext

type ProcessorContext struct {
	// contains filtered or unexported fields
}

func NewProcessorContext

func NewProcessorContext(t Task, l log.Logger, s stats.Stats) *ProcessorContext

func (*ProcessorContext) Commit

func (c *ProcessorContext) Commit() error

func (*ProcessorContext) Forward

func (c *ProcessorContext) Forward(key, value interface{}) error

func (*ProcessorContext) ForwardToChild

func (c *ProcessorContext) ForwardToChild(key, value interface{}, index int) error

func (*ProcessorContext) Logger

func (c *ProcessorContext) Logger() log.Logger

func (*ProcessorContext) Stats

func (c *ProcessorContext) Stats() stats.Stats

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

func (*ProcessorNode) Close

func (n *ProcessorNode) Close() error

func (*ProcessorNode) Process

func (n *ProcessorNode) Process(key, value interface{}) error

func (*ProcessorNode) WithContext

func (n *ProcessorNode) WithContext(ctx Context)

type Source

type Source interface {
	Consume() (key, value interface{}, err error)
	Commit() error
	Close() error
}

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

func (*SourceNode) Close

func (n *SourceNode) Close() error

func (*SourceNode) Process

func (n *SourceNode) Process(key, value interface{}) error

func (*SourceNode) WithContext

func (n *SourceNode) WithContext(ctx Context)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() *Topology

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

type Task

type Task interface {
	Start()
	Commit() error
	OnError(fn ErrorFunc)
	Close() error
}

func NewTask

func NewTask(topology *Topology, opts ...TaskFunc) Task

type TaskFunc

type TaskFunc func(*streamTask)

func WithLogger

func WithLogger(logger log.Logger) TaskFunc

func WithStats

func WithStats(stats stats.Stats) TaskFunc

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

func (Topology) Processors

func (t Topology) Processors() []Node

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() *Topology

Directories

Path Synopsis
example
branch command
kafka command
merge command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL