selina

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2020 License: MIT Imports: 6 Imported by: 0

README

selina

Go Report Card Coverage Status

Simple Pipeline for go, inspired on ratchet https://github.com/dailyburn/ratchet

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyStarted = errors.New("node already started")

ErrAlreadyStarted returned if Start method is called more than once

View Source
var ErrStopNotStarted = errors.New("stopping a not started worker")

ErrStopNotStarted returned when Stop is called before Start method

Functions

func ATPipelineContextCancel

func ATPipelineContextCancel(p Pipeliner, t *testing.T)

ATPipelineContextCancel context must be propagated to all Nodes

func ATPipelineStartAll

func ATPipelineStartAll(p Pipeliner, t *testing.T)

ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called

func ChannelAsSlice

func ChannelAsSlice(in <-chan []byte) []string

ChannelAsSlice read from in channel until is closed return an slice with all messages received

func SliceAsChannel

func SliceAsChannel(data []string, autoClose bool) chan []byte

SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed

func SliceAsChannelRaw

func SliceAsChannelRaw(data [][]byte, autoClose bool) chan []byte

SliceAsChannelRaw same as SliceAsChannel

Types

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster allow to write same value to multiple groutines

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(input <-chan []byte)

Broadcast read values from input and send it to output channels

func (*Broadcaster) Client

func (b *Broadcaster) Client() <-chan []byte

Client create an output chanel, it returns an error if Broadcast is already called

type MultiError

type MultiError struct {
	InnerErrors map[string]error
}

MultiError and error that contains all pipeline's Node.Start error

func (*MultiError) Error

func (e *MultiError) Error() string

Error to implement "error" interface

type Node

type Node struct {
	Name string
	// contains filtered or unexported fields
}

Node a node that can send and receive data

func NewNode

func NewNode(name string, w Worker) *Node

NewNode create a new node that wraps Worker

func (*Node) Chain

func (n *Node) Chain(next *Node) *Node

Chain send messages emitted by worker to next node, it returns next node to be chained again

func (*Node) Running

func (n *Node) Running() bool

Running true if Start() method was called

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error

func (*Node) Stop

func (n *Node) Stop() error

Stop stop worker in node, must be called after Start

type Pipeliner

type Pipeliner interface {
	Run(context.Context) error
	Stats() map[string]string
	Nodes() []*Node
}

Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Node

func NewSimplePipeline

func NewSimplePipeline(n ...*Node) Pipeliner

NewSimplePipeline create a linear pipeline

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called

func (*Receiver) Receive

func (r *Receiver) Receive() <-chan []byte

Receive listen to all channels configured with Watch when all channels are closed, output chanel is closed too if there is no channels in watch list , this method returns a nil channel

func (*Receiver) Watch

func (r *Receiver) Watch(input <-chan []byte)

Watch add a new channel to be joined Call Watch after Receive is a panic

type SimplePipeline

type SimplePipeline struct {
	// contains filtered or unexported fields
}

SimplePipeline default value is unusable, you must create it with NewSimplePipeline

func (*SimplePipeline) Nodes

func (p *SimplePipeline) Nodes() []*Node

Nodes return all instances of *Node

func (*SimplePipeline) Run

func (p *SimplePipeline) Run(ctx context.Context) error

Run init pipeline proccesing, return an error!= nil if any Node fail

func (*SimplePipeline) Stats

func (p *SimplePipeline) Stats() map[string]string

Stats TODO: implement

type Worker

type Worker interface {
	//Process must close write only channel
	Process(ctx context.Context, input <-chan []byte, output chan<- []byte) error
}

Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil

Directories

Path Synopsis
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL