Versions in this module Expand all Collapse all v2 v2.6.0 Jan 29, 2019 Changes in this version + var ErrAlreadyRunning = errors.New("streams: supervisor already running") + var ErrNotRunning = errors.New("streams: supervisor not running") + var ErrUnknownPump = errors.New("streams: encountered an unknown pump") + type BranchProcessor struct + func (p *BranchProcessor) Close() error + func (p *BranchProcessor) Process(msg *Message) error + func (p *BranchProcessor) WithPipe(pipe Pipe) + type Committer interface + Commit func() error + type ErrorFunc func(error) + type FilterProcessor struct + func (p *FilterProcessor) Close() error + func (p *FilterProcessor) Process(msg *Message) error + func (p *FilterProcessor) WithPipe(pipe Pipe) + type FlatMapProcessor struct + func (p *FlatMapProcessor) Close() error + func (p *FlatMapProcessor) Process(msg *Message) error + func (p *FlatMapProcessor) WithPipe(pipe Pipe) + type FlatMapper interface + FlatMap func(*Message) ([]*Message, error) + type FlatMapperFunc func(*Message) ([]*Message, error) + func (fn FlatMapperFunc) FlatMap(msg *Message) ([]*Message, error) + type MapProcessor struct + func (p *MapProcessor) Close() error + func (p *MapProcessor) Process(msg *Message) error + func (p *MapProcessor) WithPipe(pipe Pipe) + type Mapper interface + Map func(*Message) (*Message, error) + type MapperFunc func(*Message) (*Message, error) + func (fn MapperFunc) Map(msg *Message) (*Message, error) + type MergeProcessor struct + func (p *MergeProcessor) Close() error + func (p *MergeProcessor) Process(msg *Message) error + func (p *MergeProcessor) WithPipe(pipe Pipe) + type Message struct + Ctx context.Context + Key interface{} + Value interface{} + func NewMessage(k, v interface{}) *Message + func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message + func (m *Message) Metadata() (Source, Metadata) + func (m *Message) WithMetadata(s Source, v Metadata) *Message + func (m Message) Empty() bool + type Metadata interface + Merge func(Metadata, MetadataStrategy) Metadata + WithOrigin func(MetadataOrigin) + type MetadataOrigin uint8 + const CommitterOrigin + const ProcessorOrigin + type MetadataStrategy uint8 + const Dupless + const Lossless + type Metaitem struct + Metadata Metadata + Source Source + type Metaitems []*Metaitem + func (m Metaitems) Merge(items Metaitems, strategy MetadataStrategy) Metaitems + type Metastore interface + Mark func(Processor, Source, Metadata) error + Pull func(Processor) (Metaitems, error) + PullAll func() (map[Processor]Metaitems, error) + func NewMetastore() Metastore + type Node interface + AddChild func(n Node) + Children func() []Node + Name func() string + Processor func() Processor + type Pipe interface + Commit func(*Message) error + Forward func(*Message) error + ForwardToChild func(*Message, int) error + Mark func(*Message) error + func NewPipe(store Metastore, supervisor Supervisor, proc Processor, children []Pump) Pipe + type Predicate interface + Assert func(*Message) (bool, error) + type PredicateFunc func(*Message) (bool, error) + func (fn PredicateFunc) Assert(msg *Message) (bool, error) + type PrintProcessor struct + func (p *PrintProcessor) Close() error + func (p *PrintProcessor) Process(msg *Message) error + func (p *PrintProcessor) WithPipe(pipe Pipe) + type Processor interface + Close func() error + Process func(*Message) error + WithPipe func(Pipe) + func NewBranchProcessor(preds []Predicate) Processor + func NewFilterProcessor(pred Predicate) Processor + func NewFlatMapProcessor(mapper FlatMapper) Processor + func NewMapProcessor(mapper Mapper) Processor + func NewMergeProcessor() Processor + func NewPrintProcessor() Processor + type ProcessorNode struct + func NewProcessorNode(name string, p Processor) *ProcessorNode + func (n *ProcessorNode) AddChild(node Node) + func (n *ProcessorNode) Children() []Node + func (n *ProcessorNode) Name() string + func (n *ProcessorNode) Processor() Processor + type Pump interface + Accept func(*Message) error + Close func() error + Stop func() + func NewAsyncPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump + func NewSyncPump(node Node, pipe TimedPipe) Pump + type Source interface + Close func() error + Commit func(interface{}) error + Consume func() (*Message, error) + type SourceNode struct + func NewSourceNode(name string) *SourceNode + func (n *SourceNode) AddChild(node Node) + func (n *SourceNode) Children() []Node + func (n *SourceNode) Name() string + func (n *SourceNode) Processor() Processor + type SourcePump interface + Close func() error + Stop func() + func NewSourcePump(name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump + type SourcePumps []SourcePump + func (p SourcePumps) StopAll() + type Stream struct + func (s *Stream) Branch(name string, preds ...Predicate) []*Stream + func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream + func (s *Stream) Filter(name string, pred Predicate) *Stream + func (s *Stream) FilterFunc(name string, pred PredicateFunc) *Stream + func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream + func (s *Stream) FlatMapFunc(name string, mapper FlatMapperFunc) *Stream + func (s *Stream) Map(name string, mapper Mapper) *Stream + func (s *Stream) MapFunc(name string, mapper MapperFunc) *Stream + func (s *Stream) Merge(name string, streams ...*Stream) *Stream + func (s *Stream) Print(name string) *Stream + func (s *Stream) Process(name string, p Processor) *Stream + type StreamBuilder struct + func NewStreamBuilder() *StreamBuilder + func (sb *StreamBuilder) Build() (*Topology, []error) + func (sb *StreamBuilder) Source(name string, source Source) *Stream + type Supervisor interface + Commit func(Processor) error + Start func() error + WithPumps func(pumps map[Node]Pump) + func NewSupervisor(store Metastore, strategy MetadataStrategy) Supervisor + func NewTimedSupervisor(inner Supervisor, d time.Duration, errFn ErrorFunc) Supervisor + type Task interface + Close func() error + OnError func(fn ErrorFunc) + Start func() error + func NewTask(topology *Topology, opts ...TaskOptFunc) Task + type TaskMode int8 + const Async + const Sync + type TaskOptFunc func(t *streamTask) + func WithCommitInterval(d time.Duration) TaskOptFunc + func WithMetadataStrategy(strategy MetadataStrategy) TaskOptFunc + func WithMode(m TaskMode) TaskOptFunc + type Tasks []Task + func (tasks Tasks) Close() error + func (tasks Tasks) OnError(fn ErrorFunc) + func (tasks Tasks) Start() error + type TimedPipe interface + Duration func() time.Duration + Reset func() + type Topology struct + func (t Topology) Processors() []Node + func (t Topology) Sources() map[Source]Node + type TopologyBuilder struct + func NewTopologyBuilder() *TopologyBuilder + func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node + func (tb *TopologyBuilder) AddSource(name string, source Source) Node + func (tb *TopologyBuilder) Build() (*Topology, []error) Other modules containing this package github.com/msales/streams github.com/msales/streams/v4 github.com/msales/streams/v6