operations

package
v0.52.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: MIT Imports: 15 Imported by: 73

Documentation

Overview

Package operations provides the Operations API for managing and executing deployment operations in a structured, reliable, and traceable manner.

Operations API

The Operations API enables: - Defining reusable deployment operations with versioning - Executing operations with retry logic and error handling - Tracking operation results and generating reports - Sequencing multiple operations with dependencies

Core Components

Operation:

  • Defines a single deployment operation with inputs, dependencies, and outputs
  • Includes versioning, validation, and execution logic
  • Supports generic typing for type-safe operation definitions

Registry:

  • Stores and retrieves operations by ID and version
  • Enables operation lookup and reuse across deployments
  • Provides centralized operation management

Executor:

  • Executes operations with configurable retry policies
  • Handles operation failures and recovery strategies
  • Supports input hooks for dynamic parameter adjustment

Sequence:

  • Orchestrates multiple operations in dependency order
  • Manages operation execution flow and error propagation
  • Provides sequence-level reporting and validation

Reporter:

  • Tracks operation execution results and metadata
  • Generates detailed reports for audit and debugging
  • Supports custom reporting formats and outputs

Basic Usage

// Define an operation
op := operations.NewOperation(
	operations.OperationDef{ID: "deploy-contract", Version: "1.0.0"},
	handler,
)

// Execute the operation
bundle := operations.NewBundle(logger, reporter)
result, err := operations.ExecuteOperation(bundle, op, input, deps)

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotSerializable = errors.New("data cannot be safely written to disk without data lost, " +
	"avoid type that can't be serialized")
View Source
var ErrOperationNotFound = errors.New("operation not found in registry")
View Source
var ErrReportNotFound = errors.New("report not found")

Functions

func IsSerializable

func IsSerializable(lggr logger.Logger, v any) bool

IsSerializable returns true if the value can be marshaled and unmarshaled without losing information, false otherwise. For idempotency and reporting purposes, we need to ensure that the value can be marshaled and unmarshaled without losing information. If the value implements json.Marshaler and json.Unmarshaler, it is assumed to be serializable.

func NewUnrecoverableError

func NewUnrecoverableError(err error) error

NewUnrecoverableError creates an error that indicates an unrecoverable error. If this error is returned inside an operation, the operation will no longer retry. This allows the operation to fail fast if it encounters an unrecoverable error.

func RegisterOperation added in v0.13.0

func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O])

RegisterOperation registers new operations in the registry. To register operations with different input, output, and dependency types, call RegisterOperation multiple times with different type parameters. If the same operation is registered multiple times, it will overwrite the previous one.

Types

type Bundle

type Bundle struct {
	Logger     logger.Logger
	GetContext func() context.Context

	OperationRegistry *OperationRegistry
	// contains filtered or unexported fields
}

Bundle contains the dependencies required by Operations API and is passed to the OperationHandler and SequenceHandler. It contains the Logger, Reporter and the context. Use NewBundle to create a new Bundle.

func NewBundle

func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter, opts ...BundleOption) Bundle

NewBundle creates and returns a new Bundle.

type BundleOption added in v0.13.0

type BundleOption func(*Bundle)

BundleOption is a functional option for configuring a Bundle

func WithOperationRegistry added in v0.13.0

func WithOperationRegistry(registry *OperationRegistry) BundleOption

WithOperationRegistry sets a custom OperationRegistry for the Bundle

type Definition

type Definition struct {
	ID          string          `json:"id"`
	Version     *semver.Version `json:"version"`
	Description string          `json:"description"`
}

Definition is the metadata for a sequence or an operation. It contains the ID, version and description. This definition and OperationHandler together form the composite keys for an Operation. 2 Operations are considered the same if they have the Definition and OperationHandler.

type EmptyInput

type EmptyInput struct{}

EmptyInput is a placeholder for operations that do not require input.

type ExecuteConfig

type ExecuteConfig[IN, DEP any] struct {
	// contains filtered or unexported fields
}

ExecuteConfig is the configuration for the ExecuteOperation function.

type ExecuteOption

type ExecuteOption[IN, DEP any] func(*ExecuteConfig[IN, DEP])

func WithRetry added in v0.0.14

func WithRetry[IN, DEP any]() ExecuteOption[IN, DEP]

WithRetry is an ExecuteOption that enables the default retry for the operation.

func WithRetryConfig

func WithRetryConfig[IN, DEP any](config RetryConfig[IN, DEP]) ExecuteOption[IN, DEP]

WithRetryConfig is an ExecuteOption that sets the retry configuration. This provides a way to customize the retry behavior specific to the needs of the operation. Use this for the most flexibility and control over the retry behavior.

func WithRetryInput added in v0.0.14

func WithRetryInput[IN, DEP any](inputHookFunc func(uint, error, IN, DEP) IN) ExecuteOption[IN, DEP]

WithRetryInput is an ExecuteOption that enables the default retry and provide an input transform function which will modify the input on each retry attempt.

type ExecutionSeries added in v0.12.0

type ExecutionSeries struct {
	// ID is a unique identifier for an execution series.
	ID string `json:"id"`
	// Order is the execution order in which the operation was executed as part of the execution series
	Order uint `json:"order"`
}

ExecutionSeries is used to track the execution of an operation that was executed multiple times. It contains the unique ID for the ExecutionSeries and the order in which it was executed. Same Operation with the same executionSeriesID are executed as part of the same group together.

type MemoryReporter

type MemoryReporter struct {
	// contains filtered or unexported fields
}

MemoryReporter stores reports in memory. This is thread-safe and can be used in a multi-threaded environment.

func NewMemoryReporter

func NewMemoryReporter(options ...MemoryReporterOption) *MemoryReporter

NewMemoryReporter creates a new MemoryReporter. It can be initialized with a list of reports using the WithReports option.

func (*MemoryReporter) AddReport

func (e *MemoryReporter) AddReport(report Report[any, any]) error

AddReport adds a report to the memory reporter.

func (*MemoryReporter) GetExecutionReports

func (e *MemoryReporter) GetExecutionReports(seqID string) ([]Report[any, any], error)

GetExecutionReports returns all the reports that was executed as part of a sequence including itself. It does this by recursively fetching all the child reports. Useful when returning all the reports in a sequence to the changeset output.

func (*MemoryReporter) GetReport

func (e *MemoryReporter) GetReport(id string) (Report[any, any], error)

GetReport returns a report by ID. Returns ErrReportNotFound if the report is not found.

func (*MemoryReporter) GetReports

func (e *MemoryReporter) GetReports() ([]Report[any, any], error)

GetReports returns all reports.

type MemoryReporterOption

type MemoryReporterOption func(*MemoryReporter)

func WithReports

func WithReports(reports []Report[any, any]) MemoryReporterOption

WithReports is an option to initialize the MemoryReporter with a list of reports.

type Operation

type Operation[IN, OUT, DEP any] struct {
	// contains filtered or unexported fields
}

Operation is the low level building blocks of the Operations API. Developers define their own operation with custom input and output types. Each operation should only perform max 1 side effect (e.g. send a transaction, post a job spec...) Use NewOperation to create a new operation.

func NewOperation

func NewOperation[IN, OUT, DEP any](
	id string, version *semver.Version, description string, handler OperationHandler[IN, OUT, DEP],
) *Operation[IN, OUT, DEP]

NewOperation creates a new operation. Version can be created using semver.MustParse("1.0.0") or semver.New("1.0.0"). Note: The handler should only perform maximum 1 side effect.

func (*Operation[IN, OUT, DEP]) AsUntyped added in v0.13.0

func (o *Operation[IN, OUT, DEP]) AsUntyped() *Operation[any, any, any]

AsUntyped converts the operation to an untyped operation. This is useful for storing operations in a slice or passing them around without type constraints. Warning: The input and output types will be converted to `any`, so type safety is lost.

func (*Operation[IN, OUT, DEP]) Def added in v0.13.0

func (o *Operation[IN, OUT, DEP]) Def() Definition

Def returns the operation definition.

func (*Operation[IN, OUT, DEP]) Description

func (o *Operation[IN, OUT, DEP]) Description() string

Description returns the operation description.

func (*Operation[IN, OUT, DEP]) ID

func (o *Operation[IN, OUT, DEP]) ID() string

ID returns the operation ID.

func (*Operation[IN, OUT, DEP]) Version

func (o *Operation[IN, OUT, DEP]) Version() string

Version returns the operation semver version in string.

type OperationHandler

type OperationHandler[IN, OUT, DEP any] func(e Bundle, deps DEP, input IN) (output OUT, err error)

OperationHandler is the function signature of an operation handler.

type OperationRegistry added in v0.13.0

type OperationRegistry struct {
	// contains filtered or unexported fields
}

OperationRegistry is a store for operations that allows retrieval based on their definitions.

Example

ExampleOperationRegistry demonstrates how to create and use an OperationRegistry with operations being executed dynamically with different input/output types.

// example dependencies for operations
type Deps1 struct{}
type Deps2 struct{}

// Create operations with different input/output types
stringOp := NewOperation(
	"string-op",
	semver.MustParse("1.0.0"),
	"Echo string operation",
	func(e Bundle, deps Deps1, input string) (string, error) {
		return input, nil
	},
)

intOp := NewOperation(
	"int-op",
	semver.MustParse("1.0.0"),
	"Echo integer operation",
	func(e Bundle, deps Deps2, input int) (int, error) {
		return input, nil
	},
)
// Create registry with untyped operations by providing optional initial operation
registry := NewOperationRegistry(stringOp.AsUntyped())

// An alternative way to register additional operations without calling AsUntyped()
RegisterOperation(registry, intOp)

// Create execution environment
b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry))

// Define inputs and dependencies for operations
// inputs[0] is for stringOp, inputs[1] is for intOp
// deps[0] is for stringOp, deps[1] is for intOp
inputs := []any{"input1", 42}
deps := []any{Deps1{}, Deps2{}}
defs := []Definition{
	stringOp.Def(),
	intOp.Def(),
}

// dynamically retrieve and execute operations on different inputs
for i, def := range defs {
	retrievedOp, err := registry.Retrieve(def)
	if err != nil {
		fmt.Println("error retrieving operation:", err)
		continue
	}

	report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i])
	if err != nil {
		fmt.Println("error executing operation:", err)
		continue
	}

	fmt.Println("operation output:", report.Output)
}
Output:

operation output: input1
operation output: 42

func NewOperationRegistry added in v0.13.0

func NewOperationRegistry(ops ...*Operation[any, any, any]) *OperationRegistry

NewOperationRegistry creates a new OperationRegistry with the provided untyped operations.

func (OperationRegistry) Retrieve added in v0.13.0

func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], error)

Retrieve retrieves an operation from the store based on its definition. It returns an error if the operation is not found. The definition must match the operation's ID and version. Description of the definition is not used for retrieval, only ID and Version. This allows for simplicity in retrieving operations with the same ID and version only without having to provide the description. This is useful when definition has to be provided via manual input.

type RecentReporter

type RecentReporter struct {
	Reporter
	// contains filtered or unexported fields
}

RecentReporter is a wrapper around a Reporter that keeps track of the most recent reports. Useful when trying to get a list of reports that was recently added in a sequence. It is thread-safe and can be used in a multi-threaded environment.

func NewRecentMemoryReporter

func NewRecentMemoryReporter(reporter Reporter) *RecentReporter

NewRecentMemoryReporter creates a new RecentReporter.

func (*RecentReporter) AddReport

func (e *RecentReporter) AddReport(report Report[any, any]) error

AddReport adds a report to the recent reporter.

func (*RecentReporter) GetRecentReports

func (e *RecentReporter) GetRecentReports() []Report[any, any]

GetRecentReports returns all the reports that was added since the construction of the RecentReporter.

type Report

type Report[IN, OUT any] struct {
	ID        string       `json:"id"`
	Def       Definition   `json:"definition"`
	Output    OUT          `json:"output"`
	Input     IN           `json:"input"`
	Timestamp *time.Time   `json:"timestamp"`
	Err       *ReportError `json:"error"`
	// stores a list of report ID for an operation that was executed as part of a sequence.
	ChildOperationReports []string `json:"childOperationReports"`
	// ExecutionSeries is used to track the execution of an operation that was executed multiple times
	ExecutionSeries *ExecutionSeries `json:"executionSeries,omitempty"`
}

Report is the result of an operation. It contains the inputs and other metadata that was used to execute the operation.

func ExecuteOperation

func ExecuteOperation[IN, OUT, DEP any](
	b Bundle,
	operation *Operation[IN, OUT, DEP],
	deps DEP,
	input IN,
	opts ...ExecuteOption[IN, DEP],
) (Report[IN, OUT], error)

ExecuteOperation executes an operation with the given input and dependencies. Execution will return the previous successful execution result and skip execution if there was a previous successful run found in the Reports. If previous unsuccessful execution was found, the execution will not be skipped.

Note: Operations that were skipped will not be added to the reporter.

Retry: By default, it retries the operation up to 10 times with exponential backoff if it fails. Use WithRetryConfig to customize the retry behavior. To cancel the retry early, return an error with NewUnrecoverableError.

Input & Output: The input and output must be JSON serializable. If the input is not serializable, it will return an error. To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. IsSerializable can be used to check if the input or output is serializable.

func ExecuteOperationN added in v0.12.0

func ExecuteOperationN[IN, OUT, DEP any](
	b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, seriesID string, n uint,
	opts ...ExecuteOption[IN, DEP],
) ([]Report[IN, OUT], error)

ExecuteOperationN executes the given operation multiple n times with the given input and dependencies. Execution will return the previous successful execution results and skip execution if there were previous successful runs found in the Reports. executionSeriesID is used to identify the multiple executions as a single unit. It is important to use a unique executionSeriesID for different sets of multiple executions.

func NewReport

func NewReport[IN, OUT any](
	def Definition, input IN, output OUT, err error, childReportsID ...string,
) Report[IN, OUT]

NewReport creates a new report. ChildOperationReports is applicable only for Sequence.

func (Report[IN, OUT]) ToGenericReport

func (r Report[IN, OUT]) ToGenericReport() Report[any, any]

ToGenericReport converts the Report to a generic Report. This is useful when we want to return the report as a generic type in the changeset.output.

type ReportError

type ReportError struct {
	Message string `json:"message"`
}

ReportError represents an error in the Report. Its purpose is to have an exported field `Message` for marshalling as the native error cant be marshaled to JSON.

func (ReportError) Error

func (o ReportError) Error() string

Error implements the error interface.

type Reporter

type Reporter interface {
	GetReport(id string) (Report[any, any], error)
	GetReports() ([]Report[any, any], error)
	AddReport(report Report[any, any]) error
	GetExecutionReports(reportID string) ([]Report[any, any], error)
}

Reporter manages reports. It can store them in memory, in the FS, etc.

type RetryConfig

type RetryConfig[IN, DEP any] struct {
	// Enabled determines if the retry is enabled for the operation.
	Enabled bool

	// Policy is the retry policy to control the behavior of the retry.
	Policy RetryPolicy

	// InputHook is a function that returns an updated input before retrying the operation.
	// The operation when retried will use the input returned by this function.
	// This is useful for scenarios like updating the gas limit.
	InputHook func(attempt uint, err error, input IN, deps DEP) IN
}

type RetryPolicy added in v0.0.14

type RetryPolicy struct {
	MaxAttempts uint
}

RetryPolicy defines the arguments to control the retry behavior.

type Sequence

type Sequence[IN, OUT, DEP any] struct {
	// contains filtered or unexported fields
}

func NewSequence

func NewSequence[IN, OUT, DEP any](
	id string, version *semver.Version, description string, handler SequenceHandler[IN, OUT, DEP],
) *Sequence[IN, OUT, DEP]

NewSequence creates a new sequence. Useful for logically grouping a list of operations and also child sequence together.

func (*Sequence[IN, OUT, DEP]) Description

func (o *Sequence[IN, OUT, DEP]) Description() string

Description returns the sequence description.

func (*Sequence[IN, OUT, DEP]) ID

func (o *Sequence[IN, OUT, DEP]) ID() string

ID returns the sequence ID.

func (*Sequence[IN, OUT, DEP]) Version

func (o *Sequence[IN, OUT, DEP]) Version() string

Version returns the sequence semver version in string.

type SequenceHandler

type SequenceHandler[IN, OUT, DEP any] func(b Bundle, deps DEP, input IN) (output OUT, err error)

SequenceHandler is the function signature of a sequence handler. A sequence handler is a function that calls 1 or more operations or child sequence.

type SequenceReport

type SequenceReport[IN, OUT any] struct {
	Report[IN, OUT]

	// ExecutionReports is a list of report all the operations & sequence that was executed as part of this sequence.
	ExecutionReports []Report[any, any]
}

SequenceReport is a report for a sequence. It contains a report for the sequence itself and also a list of reports for all the operations executed as part of the sequence. The latter is useful when we want to return all the reports of the operations executed as part of the sequence in changeset output.

func ExecuteSequence

func ExecuteSequence[IN, OUT, DEP any](
	b Bundle, sequence *Sequence[IN, OUT, DEP], deps DEP, input IN,
) (SequenceReport[IN, OUT], error)

ExecuteSequence executes a Sequence and returns a SequenceReport. The SequenceReport contains a report for the Sequence and also the execution reports which are all the operations that were executed as part of this sequence. The latter is useful when we want to return all the executed reports to the changeset output. Execution will return the previous successful execution result and skip execution if there was a previous successful run found in the Reports. If previous unsuccessful execution was found, the execution will not be skipped.

Note: Sequences or Operations that were skipped will not be added to the reporter. The ExecutionReports do not include Sequences or Operations that were skipped.

Input & Output: The input and output must be JSON serializable. If the input is not serializable, it will return an error. To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. IsSerializable can be used to check if the input or output is serializable.

func (SequenceReport[IN, OUT]) ToGenericSequenceReport

func (r SequenceReport[IN, OUT]) ToGenericSequenceReport() SequenceReport[any, any]

ToGenericSequenceReport converts the SequenceReport to a generic SequenceReport. This is useful when we want to return the report as a generic type in the changeset.output.

Directories

Path Synopsis
Package optest provides utilities for operations testing.
Package optest provides utilities for operations testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL