pipeline

package
v0.19.783 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: AGPL-3.0 Imports: 6 Imported by: 0

README

Pipeline

pkg Pipeline exposes a primitive for running pipelines of go functions, to compose steps of functions, and to simply logic of complex chains.

Background + why?

Often times, infrastructure code that is properly modularized requires building types that use dependency injection and then calling a set of functions on that type with side effects. If you look at a terraform plan, we need to initialize at least a handful of things:

  • initialize backend
  • initialize archive client
  • unarchive
  • run plan
  • upload plan
  • etc...

I originally wrote a package for doing this type of thing within the context of a cli, for powertools, and it worked out really well. By having a dedicated "pipeline" package, it allows you to keep types simple, exposing primitive functions and then composing them.

Example usage

Without pipeline, you end up with lots of methods that simply compose smaller methods. For example, the following type of example demonstrates this:


func (w  *Workspace) Load(ctx context.Context) error {
  if err := w.loadRoot(ctx); err != nil {
    return fmt.Errorf("unable to load root: %w", err)
  }

  if err := w.loadArchive(ctx); err != nil {
    return fmt.Errorf("unable to load root: %w", err)
  }

  if err := w.loadVariables(ctx); err != nil {
    return fmt.Errorf("unable to load variables: %w", err)
  }

  if err := w.loadBinary(ctx); err != nil {
    return fmt.Errorf("unable to load binary: %w", err)
  }
  if err := w.loadBackendc(ctx); err != nil {
    return fmt.Errorf("unable to load backend: %w", err)
  }
}

without pipeline, you have long methods that run many different things. Adding retry logic, printing out failures, adding dry-runs and more become extremely complicated. This also makes testing much, much harder and you end up with either a ton of additional interfaces, or tests that have many mocks.

With pipeline:

func (w *Workspace) buildPipeline(ctx context.Context) (*pipeline.Pipeline, error) {
  pipe, err := pipeline.New(w.v)
  if err != nil {
    return nil, fmt.Errorf("unable to create pipeline: %w", err)
  }

  pipe.AddStep(&pipeline.Step{
    Name: "init root",
    ExecFn: w.initRoot,
  })

  pipe.AddStep(&pipeline.Step{
    Name: "init backend",
    ExecFn: execs.MapInit(w.initBackend),
    CallbackFn: callbacks.MapNoop,
  })

  pipe.AddStep(&pipeline.Step{
    Name: "init archive",
    ExecFn: execs.MapInit(w.initArchive),
    CallbackFn: callbacks.MapNoop,
  })

  ...
  return nil
}

This has a few benefits: it makes testing easier (you can test the pipeline without running it, to ensure ordering, steps), and it allows you to simplify your code.

Mappers

This package exposes two subpackages with mappers in them: exec and callbacks. We define common mappers for common function signatures here.

Generally speaking, if you are composing your pipelines properly the mapper functions should either be here already, or be easily added. In special cases, you can just implement them where you build your pipeline, as well.

Pipelines vs temporal

Temporal is a tool that we use to execute durable workflows, across different nodes with retries, etc.

This package is designed for more granular, and low level usage. In a future state, we would use this to design out large activities, or even to build workflows themselves.

Future use cases + roadmap

We plan on using this to implement different parts of our product, moving forward:

  • executors - most executors run in a single activity
  • helm plugin - our helm build / deploy plugin
  • hooks plugin - helm build / deploy plugin

We plan on adding the following features:

  • ability to share a log session between steps + upload output
  • ability to retry steps
  • custom mappers

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithLogger

func WithLogger(l hclog.Logger) pipelineOption

Types

type CallbackFn

type CallbackFn func(context.Context, hclog.Logger, []byte) error

CallbackFn is a function used to send the outputs of an exec, as a callback

type ExecFn

type ExecFn func(context.Context, hclog.Logger) ([]byte, error)

ExecFn is a function used to execute a step

type Pipeline

type Pipeline struct {
	Steps []*Step `validate_steps:"required,gt=1"`

	Log hclog.Logger `validate:"required"`
	// contains filtered or unexported fields
}

Pipeline is a type that is used to execute various commands in succession, with fail/retry logic as well as callbacks and others for sharing+persisting state. It's designed to power workflows such as running a terraform run which may involve many different steps (and outputs to s3).

This is designed so that types that need to run these types of workflows can decouple the building of the steps + logic, from the actual execution of it.

func New

func New(v *validator.Validate, opts ...pipelineOption) (*Pipeline, error)

func (*Pipeline) AddStep

func (p *Pipeline) AddStep(step *Step)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) error

Run runs a pipeline from end to end

type Step

type Step struct {
	Name       string     `validate:"required"`
	ExecFn     ExecFn     `validate:"required" faker:"pipelineExecFn"`
	CallbackFn CallbackFn `validate:"required" faker:"pipelineCallbackFn"`
}

Directories

Path Synopsis
mappers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL