flow

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: MPL-2.0 Imports: 8 Imported by: 0

Documentation

Overview

blocks/flow/block.go

Package flow provides a pipeline-style request processing block for go-code-blocks. It composes independent steps — validation, enrichment, CEL decisions, transformation and response — into a single server.Handler that is plugged directly into any server block (HTTP, Lambda, TCP).

A Flow replaces the imperative code inside a route handler with a declarative sequence of named, testable steps:

f := flow.New("order-flow",
    flow.NewStep("validate-customer",
        flow.Validate(validator, "is-pj", func(req *server.Request, _ *flow.State) map[string]any {
            return map[string]any{"customer_type": req.Header("X-Customer-Type")}
        })),

    flow.NewStep("load-customer",
        flow.Enrich(func(ctx context.Context, req *server.Request, _ *flow.State) (any, error) {
            return customersDB.GetItem(ctx, req.PathParam("id"), nil)
        })),

    flow.NewStep("check-limit",
        flow.Decide(validator, func(req *server.Request, s *flow.State) map[string]any {
            var c Customer
            s.Bind("load-customer", &c)
            return map[string]any{"amount": c.CreditLimit}
        })),

    flow.NewStep("respond",
        flow.Respond(func(ctx context.Context, req *server.Request, s *flow.State) (*server.Response, error) {
            var c Customer
            s.Bind("load-customer", &c)
            return server.JSON(200, map[string]any{
                "customer": c,
                "approved": s.Passed("check-limit", "high-value"),
            }), nil
        })),
)

app.MustRegister(f)
router.POST("/orders/:id", f.Handler())

blocks/flow/state.go

blocks/flow/steps.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block is a flow pipeline block. It implements core.Block and produces a server.Handler via Handler().

func New

func New(name string, steps ...Step) *Block

New creates a new flow Block with the given ordered steps.

func (*Block) Handler

func (b *Block) Handler() server.Handler

Handler returns a server.Handler that executes the flow for every request. The returned handler is safe to register on multiple routes.

router.POST("/orders/:id", orderFlow.Handler())
router.PUT("/orders/:id",  orderFlow.Handler())

func (*Block) Init

func (b *Block) Init(_ context.Context) error

Init implements core.Block. Flow blocks have no I/O resources to open; Init validates that at least one step is configured.

func (*Block) Name

func (b *Block) Name() string

Name implements core.Block.

func (*Block) Shutdown

func (b *Block) Shutdown(_ context.Context) error

Shutdown implements core.Block. No-op for flows.

type State

type State struct {
	// contains filtered or unexported fields
}

State is the shared execution context threaded through every step of a Flow.

It accumulates enrichment data, CEL decision results and the final response, allowing each step to build on the work of previous steps without coupling them directly to each other.

State is safe for concurrent reads from the same goroutine during sequential step execution. Do not share a State across goroutines.

func (*State) Abort

func (s *State) Abort(resp *server.Response)

Abort short-circuits the flow immediately. The provided response is returned to the caller; no further steps are executed. Calling Abort from within a step function stops the pipeline cleanly — it is not an error.

// Inside a Validate step:
state.Abort(server.Error(http.StatusForbidden, "access denied"))

func (*State) Bind

func (s *State) Bind(key string, dest any) error

Bind unmarshals the value stored under key into dest via JSON roundtrip. This allows retrieving typed structs from enrichment results without a manual type assertion.

var user User
if err := state.Bind("load-user", &user); err != nil { ... }

func (*State) Decision

func (s *State) Decision(key string) *decision.Result

Decision returns the full *decision.Result stored under key, or nil. Key matches the name passed to flow.Step when using a Decide step.

func (*State) Failed

func (s *State) Failed(decisionKey, ruleName string) bool

Failed is the inverse of Passed.

func (*State) Get

func (s *State) Get(key string) any

Get returns the value stored under key, or nil if not present.

func (*State) Has

func (s *State) Has(key string) bool

Has reports whether a value exists for the given key.

func (*State) Keys

func (s *State) Keys() []string

Keys returns the names of all enrichment values currently stored.

func (*State) Passed

func (s *State) Passed(decisionKey, ruleName string) bool

Passed reports whether the named rule passed in the decision stored under decisionKey. Returns false when the decision or rule is not found.

// In a Transform or Respond step:
if state.Passed("route-decision", "is-pj") { ... }

func (*State) Respond

func (s *State) Respond(resp *server.Response)

Respond sets the final HTTP response that will be returned after all steps complete. Unlike Abort, it does not stop subsequent steps from running. The last call to Respond wins.

state.Respond(server.JSON(http.StatusOK, result))

func (*State) Set

func (s *State) Set(key string, value any)

Set stores a value produced by an Enrich step under the given key. Typically called by step constructors, not by user code directly.

type Step

type Step struct {
	// contains filtered or unexported fields
}

Step associates a name with a StepFn. The name is used for logging, error messages, and as the automatic storage key for Enrich and Decide steps.

func DecideFromStep

func DecideFromStep(name string, d *decision.Block, inputFn func(req *server.Request, state *State) any) Step

DecideFromStep creates a Step that evaluates rules from a struct.

func DecideStep

func DecideStep(name string, d *decision.Block, inputFn func(req *server.Request, state *State) map[string]any) Step

DecideStep creates a Step that evaluates all CEL rules and stores the result. This is the preferred form — it captures the step name automatically.

flow.DecideStep("route-decision", router, func(req, state) map[string]any {
    return map[string]any{"customer_type": req.Header("X-Type")}
})

func EnrichStep

func EnrichStep(name string, fn func(ctx context.Context, req *server.Request, state *State) (any, error)) Step

EnrichStep creates a Step that fetches data and stores it under name. This is the preferred form — it captures the step name automatically.

flow.EnrichStep("load-user", usersDB, func(ctx, req, state) (any, error) {
    return usersDB.GetItem(ctx, req.PathParam("id"), nil)
})

func NewStep

func NewStep(name string, fn StepFn) Step

NewStep creates a named Step. It is the primary way to build generic steps inline within flow.New. For typed steps prefer EnrichStep and DecideStep, which capture the name automatically as the state storage key.

flow.New("checkout-flow",
    flow.NewStep("validate", flow.Validate(...)),
    flow.EnrichStep("load-user", fn),
    flow.DecideStep("check-limit", rules, fn),
    flow.NewStep("respond", flow.Respond(...)),
)

type StepFn

type StepFn func(ctx context.Context, req *server.Request, state *State) error

StepFn is the function signature for every flow step.

It receives the incoming request and the shared State accumulated so far. A step can:

  • Read enriched data: state.Get("step-name") / state.Bind("step-name", &dest)
  • Store enriched data: state.Set("key", value)
  • Read CEL results: state.Decision("step-name") / state.Passed("step", "rule")
  • Abort the flow: state.Abort(server.Error(422, "invalid"))
  • Set final response: state.Respond(server.JSON(200, result))
  • Return a fatal error: return err → caller receives HTTP 500

func AbortIf

func AbortIf(
	condition func(state *State) bool,
	respFn func(state *State) *server.Response,
) StepFn

AbortIf conditionally aborts the flow based on the current state. If condition returns true, the response from respFn is set and the flow stops. No-op when condition returns false.

// Abort if the decision said "not approved"
flow.Step("gate-approval",
    flow.AbortIf(
        func(s *flow.State) bool { return s.Failed("check-limit", "approved") },
        func(s *flow.State) *server.Response {
            return server.Error(http.StatusForbidden, "credit limit not approved")
        },
    ))

func Decide

func Decide(
	d *decision.Block,
	inputFn func(req *server.Request, state *State) map[string]any,
) StepFn

Decide runs all CEL rules in the decision block and stores the *decision.Result in state under the step name. Unlike Validate, a failing rule does NOT abort the flow — it is recorded for later steps to inspect.

flow.Step("route-decision",
    flow.Decide(router, func(req *server.Request, s *flow.State) map[string]any {
        var customer Customer
        s.Bind("load-customer", &customer)
        return map[string]any{"customer_type": customer.Type, "amount": customer.Total}
    }))

In a later step:

if state.Passed("route-decision", "is-pj") { ... }
if state.Decision("route-decision").Any() { ... }

func DecideFrom

func DecideFrom(
	d *decision.Block,
	inputFn func(req *server.Request, state *State) any,
) StepFn

DecideFrom runs all CEL rules against a struct with `decision:` tags. The result is stored under the step name.

func Enrich

func Enrich(fn func(ctx context.Context, req *server.Request, state *State) (any, error)) StepFn

Enrich calls an external source (database, REST API, cache, etc.) and stores the result in state under the step name. Subsequent steps retrieve it via state.Get("step-name") or state.Bind("step-name", &dest).

If fn returns an error, the flow aborts with HTTP 500. If fn returns nil as the value, nothing is stored (step is a no-op).

flow.Step("load-user",
    flow.Enrich(func(ctx context.Context, req *server.Request, _ *flow.State) (any, error) {
        return usersDB.GetItem(ctx, req.PathParam("id"), nil)
    }))

The result is stored under "load-user" and retrieved downstream:

var u User
state.Bind("load-user", &u)

func Respond

func Respond(fn func(ctx context.Context, req *server.Request, state *State) (*server.Response, error)) StepFn

Respond builds and sets the final HTTP response for the flow. After a Respond step, subsequent steps still run but their ability to change the response depends on them calling state.Respond again. Use state.Abort if you want to stop all further processing.

flow.Step("send-response",
    flow.Respond(func(ctx context.Context, req *server.Request, s *flow.State) (*server.Response, error) {
        var payload map[string]any
        s.Bind("payload", &payload)
        return server.JSON(http.StatusOK, payload), nil
    }))

func Transform

func Transform(fn func(ctx context.Context, req *server.Request, state *State) error) StepFn

Transform applies a pure data transformation to state without making any external calls. Use it to map, rename, filter or compute values from previously enriched data before building the response.

flow.Step("build-payload",
    flow.Transform(func(ctx context.Context, req *server.Request, s *flow.State) error {
        var user User
        s.Bind("load-user", &user)
        var order Order
        s.Bind("load-order", &order)
        s.Set("payload", map[string]any{
            "user_name":    user.Name,
            "order_total":  order.Total,
            "order_status": order.Status,
        })
        return nil
    }))

func Validate

func Validate(
	d *decision.Block,
	ruleName string,
	inputFn func(req *server.Request, state *State) map[string]any,
	opts ...ValidateOption,
) StepFn

Validate runs a single CEL rule from the decision block. If the rule fails, the flow is aborted with HTTP 422 (or the status set via WithFailStatus). If it passes, the flow continues normally.

inputFn builds the map of CEL variables from the current request and state.

flow.Step("check-type",
    flow.Validate(validator, "is-pj",
        func(req *server.Request, _ *flow.State) map[string]any {
            return map[string]any{"customer_type": req.Header("X-Customer-Type")}
        },
        flow.WithFailStatus(http.StatusForbidden),
    ))

func ValidateFrom

func ValidateFrom(
	d *decision.Block,
	ruleName string,
	inputFn func(req *server.Request, state *State) any,
	opts ...ValidateOption,
) StepFn

ValidateFrom runs a single CEL rule against a struct with `decision:` tags. The struct is built from the current request and state via inputFn.

flow.Step("check-customer",
    flow.ValidateFrom(validator, "is-pj",
        func(req *server.Request, s *flow.State) any {
            var c Customer
            s.Bind("load-customer", &c)
            return c
        }))

type ValidateOption

type ValidateOption func(*validateConfig)

ValidateOption configures the behaviour of a Validate step.

func WithFailMessage

func WithFailMessage(fn func(ruleName string) string) ValidateOption

WithFailMessage sets a custom message formatter for validation failures. The rule name that failed is passed as the argument.

flow.WithFailMessage(func(rule string) string {
    return fmt.Sprintf("business rule %q not satisfied", rule)
})

func WithFailStatus

func WithFailStatus(code int) ValidateOption

WithFailStatus overrides the HTTP status code returned when validation fails. Defaults to 422 Unprocessable Entity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL