Documentation
¶
Overview ¶
Package pipeline provides all adaptoremented functionality to move data through transporter.
A transporter pipeline consists of a tree of Nodes, with the root Node attached to the source database, and each child node is either a data transformer or a database sink. Node's can be defined like:
a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
source := pipeline.NewNodeWithOptions(
"source", "mongo", "/.*/",
pipeline.WithClient(a),
pipeline.WithReader(a),
)
f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
sink := pipeline.NewNodeWithOptions(
"out", "file", "/.*/",
pipeline.WithClient(f),
pipeline.WithWriter(f),
pipeline.WithParent(source),
)
and pipelines can be defined :
pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
pipeline.Run()
the event emitter's are defined in transporter/events, and are used to deliver error/metrics/etc about the running process
Package pipeline provides all adaptoremented functionality to move data through transporter.
Index ¶
- Variables
- type Node
- type OptionFunc
- func WithClient(a adaptor.Adaptor) OptionFunc
- func WithCommitLog(options ...commitlog.OptionFunc) OptionFunc
- func WithCompactionInterval(interval string) OptionFunc
- func WithOffsetManager(om offset.Manager) OptionFunc
- func WithParent(parent *Node) OptionFunc
- func WithReader(a adaptor.Adaptor) OptionFunc
- func WithResumeTimeout(timeout time.Duration) OptionFunc
- func WithTransforms(t []*Transform) OptionFunc
- func WithWriteTimeout(timeout string) OptionFunc
- func WithWriter(a adaptor.Adaptor) OptionFunc
- type Pipeline
- type Transform
Constants ¶
This section is empty.
Variables ¶
var ( // ErrResumeTimedOut is returned when the resumeTimeout is reached after attempting // to check that a sink offset matches the newest offset. ErrResumeTimedOut = errors.New("resume timeout reached") // ErrResumeStopped is returned when the underling pipe.Pipe has been stopped while // a Node is in the process of resuming. ErrResumeStopped = errors.New("pipe has been stopped, canceling resume") // ErrConfirmOffset is returned if the underling OffsetManager fails to commit // the offsets. ErrConfirmOffset = errors.New("failed to confirm offsets") )
Functions ¶
This section is empty.
Types ¶
type Node ¶
A Node is the basic building blocks of transporter pipelines. Nodes are constructed in a tree, with the first node broadcasting data to each of it's children.
func NewNodeWithOptions ¶ added in v0.4.0
func NewNodeWithOptions(name, kind, ns string, options ...OptionFunc) (*Node, error)
NewNodeWithOptions initializes a Node with the required parameters and then applies each OptionFunc provided.
func (*Node) Endpoints ¶
Endpoints recurses down the node tree and accumulates a map associating node name with node type this is primarily used with the boot event
func (*Node) Start ¶
Start starts the nodes children in a go routine, and then runs either Start() or Listen() on the node's adaptor. Root nodes (nodes with no parent) will run Start() and will emit messages to it's children, All descendant nodes run Listen() on the adaptor
func (*Node) Stop ¶
func (n *Node) Stop()
Stop this node's adaptor, and sends a stop to each child of this node
type OptionFunc ¶ added in v0.4.0
OptionFunc is a function that configures a Node. It is used in NewNodeWithOptions.
func WithClient ¶ added in v0.4.0
func WithClient(a adaptor.Adaptor) OptionFunc
WithClient sets the client.Client to be used for providing a client.Session to the client.Reader/Writer..
func WithCommitLog ¶ added in v0.4.0
func WithCommitLog(options ...commitlog.OptionFunc) OptionFunc
WithCommitLog configures a CommitLog for the reader to persist messages.
func WithCompactionInterval ¶ added in v0.4.0
func WithCompactionInterval(interval string) OptionFunc
WithCompactionInterval configures the duration for running log compaction.
func WithOffsetManager ¶ added in v0.4.0
func WithOffsetManager(om offset.Manager) OptionFunc
WithOffsetManager configures an offset.Manager to track message offsets.
func WithParent ¶ added in v0.4.0
func WithParent(parent *Node) OptionFunc
WithParent sets the parent node and reconfigures the pipe.
func WithReader ¶ added in v0.4.0
func WithReader(a adaptor.Adaptor) OptionFunc
WithReader sets the client.Reader to be used to source data from.
func WithResumeTimeout ¶ added in v0.4.0
func WithResumeTimeout(timeout time.Duration) OptionFunc
WithResumeTimeout configures how long to wait before all sink offsets match the newest offset.
func WithTransforms ¶ added in v0.4.0
func WithTransforms(t []*Transform) OptionFunc
WithTransforms adds the provided transforms to be applied in the pipeline.
func WithWriteTimeout ¶ added in v0.4.0
func WithWriteTimeout(timeout string) OptionFunc
WithWriteTimeout configures the timeout duration for a writer to return.
func WithWriter ¶ added in v0.4.0
func WithWriter(a adaptor.Adaptor) OptionFunc
WithWriter sets the client.Writer to be used to send data to.
type Pipeline ¶
type Pipeline struct {
// Err is the fatal error that was sent from the adaptor
// that caused us to stop this process. If this is nil, then
// the transporter is running
Err error
// contains filtered or unexported fields
}
A Pipeline is a the end to end description of a transporter data flow. including the source, sink, and all the transformers along the way
func NewDefaultPipeline ¶
func NewDefaultPipeline(source *Node, uri, key, pid, version string, interval time.Duration) (*Pipeline, error)
NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and uses the events.HttpPostEmitter to deliver metrics. eg.
a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
source := pipeline.NewNodeWithOptions(
"source", "mongo", "/.*/",
pipeline.WithClient(a),
pipeline.WithReader(a),
)
f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
sink := pipeline.NewNodeWithOptions(
"out", "file", "/.*/",
pipeline.WithClient(f),
pipeline.WithWriter(f),
pipeline.WithParent(source),
)
pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
pipeline.Run()
func NewPipeline ¶
func NewPipeline(version string, source *Node, emit events.EmitFunc, interval time.Duration) (*Pipeline, error)
NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter eg.
a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
source := pipeline.NewNodeWithOptions(
"source", "mongo", "/.*/",
pipeline.WithClient(a),
pipeline.WithReader(a),
)
f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
sink := pipeline.NewNodeWithOptions(
"out", "file", "/.*/",
pipeline.WithClient(f),
pipeline.WithWriter(f),
pipeline.WithParent(source),
)
pipeline, err := transporter.NewPipeline("version", source, events.NewNoopEmitter(), 1*time.Second)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
pipeline.Run()