streams

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2018 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*BranchProcessor) WithContext

func (p *BranchProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Context

type Context interface {
	Forward(key, value interface{}) error
	ForwardToChild(key, value interface{}, child int) error
	Commit() error

	Logger() log.Logger
	Stats() stats.Stats
}

type ErrorFunc

type ErrorFunc func(error)

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*FilterProcessor) WithContext

func (p *FilterProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*MapProcessor) WithContext

func (p *MapProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Mapper

type Mapper func(key, value interface{}) (interface{}, interface{}, error)

Mapper represents a mapping function

type MergeProcessor

type MergeProcessor struct {
	// contains filtered or unexported fields
}

MergeProcessor is a processor that merges multiple streams.

func (*MergeProcessor) Close

func (p *MergeProcessor) Close() error

Close closes the processor.

func (*MergeProcessor) Process

func (p *MergeProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*MergeProcessor) WithContext

func (p *MergeProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Node

type Node interface {
	WithContext(ctx Context)
	AddChild(n Node)
	Children() []Node
	Process(key, value interface{}) error
	Close() error
}

type Predicate

type Predicate func(k, v interface{}) (bool, error)

Predicate represents a stream filter function.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(key, value interface{}) error

Process processes the stream record.

func (*PrintProcessor) WithContext

func (p *PrintProcessor) WithContext(ctx Context)

WithContext sets the context on the Processor.

type Processor

type Processor interface {
	// WithContext sets the context on the Processor.
	WithContext(ctx Context)
	// Process processes the stream record.
	Process(key, value interface{}) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(fns []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(fn Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewMapProcessor

func NewMapProcessor(fn Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewMergeProcessor

func NewMergeProcessor() Processor

NewMergeProcessor creates a new MergeProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorContext

type ProcessorContext struct {
	// contains filtered or unexported fields
}

func NewProcessorContext

func NewProcessorContext(t Task, l log.Logger, s stats.Stats) *ProcessorContext

func (*ProcessorContext) Commit

func (c *ProcessorContext) Commit() error

func (*ProcessorContext) Forward

func (c *ProcessorContext) Forward(key, value interface{}) error

func (*ProcessorContext) ForwardToChild

func (c *ProcessorContext) ForwardToChild(key, value interface{}, index int) error

func (*ProcessorContext) Logger

func (c *ProcessorContext) Logger() log.Logger

func (*ProcessorContext) Stats

func (c *ProcessorContext) Stats() stats.Stats

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

func (*ProcessorNode) Close

func (n *ProcessorNode) Close() error

func (*ProcessorNode) Process

func (n *ProcessorNode) Process(key, value interface{}) error

func (*ProcessorNode) WithContext

func (n *ProcessorNode) WithContext(ctx Context)

type Source

type Source interface {
	Consume() (key, value interface{}, err error)
	Commit() error
	Close() error
}

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

func (*SourceNode) Close

func (n *SourceNode) Close() error

func (*SourceNode) Process

func (n *SourceNode) Process(key, value interface{}) error

func (*SourceNode) WithContext

func (n *SourceNode) WithContext(ctx Context)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() *Topology

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

type Task

type Task interface {
	Start()
	Commit() error
	OnError(fn ErrorFunc)
	Close() error
}

func NewTask

func NewTask(topology *Topology, opts ...TaskFunc) Task

type TaskFunc

type TaskFunc func(*streamTask)

func WithLogger

func WithLogger(logger log.Logger) TaskFunc

func WithStats

func WithStats(stats stats.Stats) TaskFunc

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

func (Topology) Processors

func (t Topology) Processors() []Node

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() *Topology

Directories

Path Synopsis
example
branch command
kafka command
merge command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL