Pipeline
pkg Pipeline exposes a primitive for running pipelines of go functions, to compose steps of functions, and to simply logic of complex chains.
Background + why?
Often times, infrastructure code that is properly modularized requires building types that use dependency injection and then calling a set of functions on that type with side effects. If you look at a terraform plan, we need to initialize at least a handful of things:
- initialize backend
- initialize archive client
- unarchive
- run plan
- upload plan
- etc...
I originally wrote a package for doing this type of thing within the context of a cli, for powertools, and it worked out really well. By having a dedicated "pipeline" package, it allows you to keep types simple, exposing primitive functions and then composing them.
Example usage
Without pipeline, you end up with lots of methods that simply compose smaller methods. For example, the following type of example demonstrates this:
func (w *Workspace) Load(ctx context.Context) error {
if err := w.loadRoot(ctx); err != nil {
return fmt.Errorf("unable to load root: %w", err)
}
if err := w.loadArchive(ctx); err != nil {
return fmt.Errorf("unable to load root: %w", err)
}
if err := w.loadVariables(ctx); err != nil {
return fmt.Errorf("unable to load variables: %w", err)
}
if err := w.loadBinary(ctx); err != nil {
return fmt.Errorf("unable to load binary: %w", err)
}
if err := w.loadBackendc(ctx); err != nil {
return fmt.Errorf("unable to load backend: %w", err)
}
}
without pipeline, you have long methods that run many different things. Adding retry logic, printing out failures, adding dry-runs and more become extremely complicated. This also makes testing much, much harder and you end up with either a ton of additional interfaces, or tests that have many mocks.
With pipeline:
func (w *Workspace) buildPipeline(ctx context.Context) (*pipeline.Pipeline, error) {
pipe, err := pipeline.New(w.v)
if err != nil {
return nil, fmt.Errorf("unable to create pipeline: %w", err)
}
pipe.AddStep(&pipeline.Step{
Name: "init root",
ExecFn: w.initRoot,
})
pipe.AddStep(&pipeline.Step{
Name: "init backend",
ExecFn: execs.MapInit(w.initBackend),
CallbackFn: callbacks.MapNoop,
})
pipe.AddStep(&pipeline.Step{
Name: "init archive",
ExecFn: execs.MapInit(w.initArchive),
CallbackFn: callbacks.MapNoop,
})
...
return nil
}
This has a few benefits: it makes testing easier (you can test the pipeline without running it, to ensure ordering, steps), and it allows you to simplify your code.
Mappers
This package exposes two subpackages with mappers in them: exec and callbacks. We define common mappers for common function signatures here.
Generally speaking, if you are composing your pipelines properly the mapper functions should either be here already, or be easily added. In special cases, you can just implement them where you build your pipeline, as well.
Pipelines vs temporal
Temporal is a tool that we use to execute durable workflows, across different nodes with retries, etc.
This package is designed for more granular, and low level usage. In a future state, we would use this to design out large activities, or even to build workflows themselves.
Future use cases + roadmap
We plan on using this to implement different parts of our product, moving forward:
- executors - most executors run in a single activity
- helm plugin - our helm build / deploy plugin
- hooks plugin - helm build / deploy plugin
We plan on adding the following features:
- ability to share a log session between steps + upload output
- ability to retry steps
- custom mappers