pipeline

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2017 License: BSD-3-Clause Imports: 20 Imported by: 3

Documentation

Overview

Package pipeline provides all adaptoremented functionality to move data through transporter.

A transporter pipeline consists of a tree of Nodes, with the root Node attached to the source database, and each child node is either a data transformer or a database sink. Node's can be defined like:

a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
if err != nil {
  fmt.Println(err)
  os.Exit(1)
}
source := pipeline.NewNodeWithOptions(
  "source", "mongo", "/.*/",
  pipeline.WithClient(a),
  pipeline.WithReader(a),
)
f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
sink := pipeline.NewNodeWithOptions(
  "out", "file", "/.*/",
  pipeline.WithClient(f),
  pipeline.WithWriter(f),
  pipeline.WithParent(source),
)

and pipelines can be defined :

pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second)
if err != nil {
  fmt.Println(err)
  os.Exit(1)
}
pipeline.Run()

the event emitter's are defined in transporter/events, and are used to deliver error/metrics/etc about the running process

Package pipeline provides all adaptoremented functionality to move data through transporter.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrResumeTimedOut is returned when the resumeTimeout is reached after attempting
	// to check that a sink offset matches the newest offset.
	ErrResumeTimedOut = errors.New("resume timeout reached")

	// ErrResumeStopped is returned when the underling pipe.Pipe has been stopped while
	// a Node is in the process of resuming.
	ErrResumeStopped = errors.New("pipe has been stopped, canceling resume")

	// ErrConfirmOffset is returned if the underling OffsetManager fails to commit
	// the offsets.
	ErrConfirmOffset = errors.New("failed to confirm offsets")
)

Functions

This section is empty.

Types

type Node

type Node struct {
	Name string
	Type string
	// contains filtered or unexported fields
}

A Node is the basic building blocks of transporter pipelines. Nodes are constructed in a tree, with the first node broadcasting data to each of it's children.

func NewNodeWithOptions added in v0.4.0

func NewNodeWithOptions(name, kind, ns string, options ...OptionFunc) (*Node, error)

NewNodeWithOptions initializes a Node with the required parameters and then applies each OptionFunc provided.

func (*Node) Endpoints

func (n *Node) Endpoints() map[string]string

Endpoints recurses down the node tree and accumulates a map associating node name with node type this is primarily used with the boot event

func (*Node) Start

func (n *Node) Start() error

Start starts the nodes children in a go routine, and then runs either Start() or Listen() on the node's adaptor. Root nodes (nodes with no parent) will run Start() and will emit messages to it's children, All descendant nodes run Listen() on the adaptor

func (*Node) Stop

func (n *Node) Stop()

Stop this node's adaptor, and sends a stop to each child of this node

func (*Node) String

func (n *Node) String() string

func (*Node) Validate

func (n *Node) Validate() bool

Validate ensures that the node tree conforms to a proper structure. Node trees must have at least one source, and one sink. dangling transformers are forbidden. Validate only knows about default adaptors in the adaptor package, it can't validate any custom adaptors

type OptionFunc added in v0.4.0

type OptionFunc func(*Node) error

OptionFunc is a function that configures a Node. It is used in NewNodeWithOptions.

func WithClient added in v0.4.0

func WithClient(a adaptor.Adaptor) OptionFunc

WithClient sets the client.Client to be used for providing a client.Session to the client.Reader/Writer..

func WithCommitLog added in v0.4.0

func WithCommitLog(options ...commitlog.OptionFunc) OptionFunc

WithCommitLog configures a CommitLog for the reader to persist messages.

func WithCompactionInterval added in v0.4.0

func WithCompactionInterval(interval string) OptionFunc

WithCompactionInterval configures the duration for running log compaction.

func WithOffsetManager added in v0.4.0

func WithOffsetManager(om offset.Manager) OptionFunc

WithOffsetManager configures an offset.Manager to track message offsets.

func WithParent added in v0.4.0

func WithParent(parent *Node) OptionFunc

WithParent sets the parent node and reconfigures the pipe.

func WithReader added in v0.4.0

func WithReader(a adaptor.Adaptor) OptionFunc

WithReader sets the client.Reader to be used to source data from.

func WithResumeTimeout added in v0.4.0

func WithResumeTimeout(timeout time.Duration) OptionFunc

WithResumeTimeout configures how long to wait before all sink offsets match the newest offset.

func WithTransforms added in v0.4.0

func WithTransforms(t []*Transform) OptionFunc

WithTransforms adds the provided transforms to be applied in the pipeline.

func WithWriteTimeout added in v0.4.0

func WithWriteTimeout(timeout string) OptionFunc

WithWriteTimeout configures the timeout duration for a writer to return.

func WithWriter added in v0.4.0

func WithWriter(a adaptor.Adaptor) OptionFunc

WithWriter sets the client.Writer to be used to send data to.

type Pipeline

type Pipeline struct {

	// Err is the fatal error that was sent from the adaptor
	// that caused us to stop this process.  If this is nil, then
	// the transporter is running
	Err error
	// contains filtered or unexported fields
}

A Pipeline is a the end to end description of a transporter data flow. including the source, sink, and all the transformers along the way

func NewDefaultPipeline

func NewDefaultPipeline(source *Node, uri, key, pid, version string, interval time.Duration) (*Pipeline, error)

NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and uses the events.HttpPostEmitter to deliver metrics. eg.

  a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
  if err != nil {
    fmt.Println(err)
    os.Exit(1)
  }
  source := pipeline.NewNodeWithOptions(
    "source", "mongo", "/.*/",
    pipeline.WithClient(a),
    pipeline.WithReader(a),
  )
  f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
  sink := pipeline.NewNodeWithOptions(
    "out", "file", "/.*/",
    pipeline.WithClient(f),
    pipeline.WithWriter(f),
    pipeline.WithParent(source),
  )
  pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second)
  if err != nil {
	  fmt.Println(err)
	  os.Exit(1)
  }

pipeline.Run()

func NewPipeline

func NewPipeline(version string, source *Node, emit events.EmitFunc, interval time.Duration) (*Pipeline, error)

NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter eg.

  a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
  if err != nil {
    fmt.Println(err)
    os.Exit(1)
  }
  source := pipeline.NewNodeWithOptions(
    "source", "mongo", "/.*/",
    pipeline.WithClient(a),
    pipeline.WithReader(a),
  )
  f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
  sink := pipeline.NewNodeWithOptions(
    "out", "file", "/.*/",
    pipeline.WithClient(f),
    pipeline.WithWriter(f),
    pipeline.WithParent(source),
  )
  pipeline, err := transporter.NewPipeline("version", source, events.NewNoopEmitter(), 1*time.Second)
  if err != nil {
	  fmt.Println(err)
	  os.Exit(1)
  }

pipeline.Run()

func (*Pipeline) Run

func (pipeline *Pipeline) Run() error

Run the pipeline

func (*Pipeline) Stop

func (pipeline *Pipeline) Stop()

Stop sends a stop signal to the emitter and all the nodes, whether they are running or not. the node's database adaptors are expected to clean up after themselves, and stop will block until all nodes have stopped successfully

func (*Pipeline) String

func (pipeline *Pipeline) String() string

type Transform added in v0.3.0

type Transform struct {
	Name     string
	Fn       function.Function
	NsFilter *regexp.Regexp
}

Transform defines the struct for including a native function in the pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL