Documentation
¶
Overview ¶
Package operations provides the Operations API for managing and executing deployment operations in a structured, reliable, and traceable manner.
Operations API ¶
The Operations API enables: - Defining reusable deployment operations with versioning - Executing operations with retry logic and error handling - Tracking operation results and generating reports - Sequencing multiple operations with dependencies
Core Components ¶
Operation:
- Defines a single deployment operation with inputs, dependencies, and outputs
- Includes versioning, validation, and execution logic
- Supports generic typing for type-safe operation definitions
Registry:
- Stores and retrieves operations by ID and version
- Enables operation lookup and reuse across deployments
- Provides centralized operation management
Executor:
- Executes operations with configurable retry policies
- Handles operation failures and recovery strategies
- Supports input hooks for dynamic parameter adjustment
Sequence:
- Orchestrates multiple operations in dependency order
- Manages operation execution flow and error propagation
- Provides sequence-level reporting and validation
Reporter:
- Tracks operation execution results and metadata
- Generates detailed reports for audit and debugging
- Supports custom reporting formats and outputs
Basic Usage ¶
// Define an operation op := operations.NewOperation( operations.OperationDef{ID: "deploy-contract", Version: "1.0.0"}, handler, ) // Execute the operation bundle := operations.NewBundle(logger, reporter) result, err := operations.ExecuteOperation(bundle, op, input, deps)
Index ¶
- Variables
- func IsSerializable(lggr logger.Logger, v any) bool
- func NewUnrecoverableError(err error) error
- func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O])
- type Bundle
- type BundleOption
- type Definition
- type EmptyInput
- type ExecuteConfig
- type ExecuteOption
- type ExecutionSeries
- type MemoryReporter
- type MemoryReporterOption
- type Operation
- type OperationHandler
- type OperationRegistry
- type RecentReporter
- type Report
- func ExecuteOperation[IN, OUT, DEP any](b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, ...) (Report[IN, OUT], error)
- func ExecuteOperationN[IN, OUT, DEP any](b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, ...) ([]Report[IN, OUT], error)
- func NewReport[IN, OUT any](def Definition, input IN, output OUT, err error, childReportsID ...string) Report[IN, OUT]
- type ReportError
- type Reporter
- type RetryConfig
- type RetryPolicy
- type Sequence
- type SequenceHandler
- type SequenceReport
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNotSerializable = errors.New("data cannot be safely written to disk without data lost, " +
"avoid type that can't be serialized")
var ErrOperationNotFound = errors.New("operation not found in registry")
var ErrReportNotFound = errors.New("report not found")
Functions ¶
func IsSerializable ¶
IsSerializable returns true if the value can be marshaled and unmarshaled without losing information, false otherwise. For idempotency and reporting purposes, we need to ensure that the value can be marshaled and unmarshaled without losing information. If the value implements json.Marshaler and json.Unmarshaler, it is assumed to be serializable.
func NewUnrecoverableError ¶
NewUnrecoverableError creates an error that indicates an unrecoverable error. If this error is returned inside an operation, the operation will no longer retry. This allows the operation to fail fast if it encounters an unrecoverable error.
func RegisterOperation ¶ added in v0.13.0
func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O])
RegisterOperation registers new operations in the registry. To register operations with different input, output, and dependency types, call RegisterOperation multiple times with different type parameters. If the same operation is registered multiple times, it will overwrite the previous one.
Types ¶
type Bundle ¶
type Bundle struct { Logger logger.Logger GetContext func() context.Context OperationRegistry *OperationRegistry // contains filtered or unexported fields }
Bundle contains the dependencies required by Operations API and is passed to the OperationHandler and SequenceHandler. It contains the Logger, Reporter and the context. Use NewBundle to create a new Bundle.
type BundleOption ¶ added in v0.13.0
type BundleOption func(*Bundle)
BundleOption is a functional option for configuring a Bundle
func WithOperationRegistry ¶ added in v0.13.0
func WithOperationRegistry(registry *OperationRegistry) BundleOption
WithOperationRegistry sets a custom OperationRegistry for the Bundle
type Definition ¶
type Definition struct { ID string `json:"id"` Version *semver.Version `json:"version"` Description string `json:"description"` }
Definition is the metadata for a sequence or an operation. It contains the ID, version and description. This definition and OperationHandler together form the composite keys for an Operation. 2 Operations are considered the same if they have the Definition and OperationHandler.
type EmptyInput ¶
type EmptyInput struct{}
EmptyInput is a placeholder for operations that do not require input.
type ExecuteConfig ¶
type ExecuteConfig[IN, DEP any] struct { // contains filtered or unexported fields }
ExecuteConfig is the configuration for the ExecuteOperation function.
type ExecuteOption ¶
type ExecuteOption[IN, DEP any] func(*ExecuteConfig[IN, DEP])
func WithRetry ¶ added in v0.0.14
func WithRetry[IN, DEP any]() ExecuteOption[IN, DEP]
WithRetry is an ExecuteOption that enables the default retry for the operation.
func WithRetryConfig ¶
func WithRetryConfig[IN, DEP any](config RetryConfig[IN, DEP]) ExecuteOption[IN, DEP]
WithRetryConfig is an ExecuteOption that sets the retry configuration. This provides a way to customize the retry behavior specific to the needs of the operation. Use this for the most flexibility and control over the retry behavior.
func WithRetryInput ¶ added in v0.0.14
func WithRetryInput[IN, DEP any](inputHookFunc func(uint, error, IN, DEP) IN) ExecuteOption[IN, DEP]
WithRetryInput is an ExecuteOption that enables the default retry and provide an input transform function which will modify the input on each retry attempt.
type ExecutionSeries ¶ added in v0.12.0
type ExecutionSeries struct { // ID is a unique identifier for an execution series. ID string `json:"id"` // Order is the execution order in which the operation was executed as part of the execution series Order uint `json:"order"` }
ExecutionSeries is used to track the execution of an operation that was executed multiple times. It contains the unique ID for the ExecutionSeries and the order in which it was executed. Same Operation with the same executionSeriesID are executed as part of the same group together.
type MemoryReporter ¶
type MemoryReporter struct {
// contains filtered or unexported fields
}
MemoryReporter stores reports in memory. This is thread-safe and can be used in a multi-threaded environment.
func NewMemoryReporter ¶
func NewMemoryReporter(options ...MemoryReporterOption) *MemoryReporter
NewMemoryReporter creates a new MemoryReporter. It can be initialized with a list of reports using the WithReports option.
func (*MemoryReporter) AddReport ¶
func (e *MemoryReporter) AddReport(report Report[any, any]) error
AddReport adds a report to the memory reporter.
func (*MemoryReporter) GetExecutionReports ¶
GetExecutionReports returns all the reports that was executed as part of a sequence including itself. It does this by recursively fetching all the child reports. Useful when returning all the reports in a sequence to the changeset output.
func (*MemoryReporter) GetReport ¶
GetReport returns a report by ID. Returns ErrReportNotFound if the report is not found.
func (*MemoryReporter) GetReports ¶
func (e *MemoryReporter) GetReports() ([]Report[any, any], error)
GetReports returns all reports.
type MemoryReporterOption ¶
type MemoryReporterOption func(*MemoryReporter)
func WithReports ¶
func WithReports(reports []Report[any, any]) MemoryReporterOption
WithReports is an option to initialize the MemoryReporter with a list of reports.
type Operation ¶
type Operation[IN, OUT, DEP any] struct { // contains filtered or unexported fields }
Operation is the low level building blocks of the Operations API. Developers define their own operation with custom input and output types. Each operation should only perform max 1 side effect (e.g. send a transaction, post a job spec...) Use NewOperation to create a new operation.
func NewOperation ¶
func NewOperation[IN, OUT, DEP any]( id string, version *semver.Version, description string, handler OperationHandler[IN, OUT, DEP], ) *Operation[IN, OUT, DEP]
NewOperation creates a new operation. Version can be created using semver.MustParse("1.0.0") or semver.New("1.0.0"). Note: The handler should only perform maximum 1 side effect.
func (*Operation[IN, OUT, DEP]) AsUntyped ¶ added in v0.13.0
AsUntyped converts the operation to an untyped operation. This is useful for storing operations in a slice or passing them around without type constraints. Warning: The input and output types will be converted to `any`, so type safety is lost.
func (*Operation[IN, OUT, DEP]) Def ¶ added in v0.13.0
func (o *Operation[IN, OUT, DEP]) Def() Definition
Def returns the operation definition.
func (*Operation[IN, OUT, DEP]) Description ¶
Description returns the operation description.
type OperationHandler ¶
OperationHandler is the function signature of an operation handler.
type OperationRegistry ¶ added in v0.13.0
type OperationRegistry struct {
// contains filtered or unexported fields
}
OperationRegistry is a store for operations that allows retrieval based on their definitions.
Example ¶
ExampleOperationRegistry demonstrates how to create and use an OperationRegistry with operations being executed dynamically with different input/output types.
// example dependencies for operations type Deps1 struct{} type Deps2 struct{} // Create operations with different input/output types stringOp := NewOperation( "string-op", semver.MustParse("1.0.0"), "Echo string operation", func(e Bundle, deps Deps1, input string) (string, error) { return input, nil }, ) intOp := NewOperation( "int-op", semver.MustParse("1.0.0"), "Echo integer operation", func(e Bundle, deps Deps2, input int) (int, error) { return input, nil }, ) // Create registry with untyped operations by providing optional initial operation registry := NewOperationRegistry(stringOp.AsUntyped()) // An alternative way to register additional operations without calling AsUntyped() RegisterOperation(registry, intOp) // Create execution environment b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry)) // Define inputs and dependencies for operations // inputs[0] is for stringOp, inputs[1] is for intOp // deps[0] is for stringOp, deps[1] is for intOp inputs := []any{"input1", 42} deps := []any{Deps1{}, Deps2{}} defs := []Definition{ stringOp.Def(), intOp.Def(), } // dynamically retrieve and execute operations on different inputs for i, def := range defs { retrievedOp, err := registry.Retrieve(def) if err != nil { fmt.Println("error retrieving operation:", err) continue } report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i]) if err != nil { fmt.Println("error executing operation:", err) continue } fmt.Println("operation output:", report.Output) }
Output: operation output: input1 operation output: 42
func NewOperationRegistry ¶ added in v0.13.0
func NewOperationRegistry(ops ...*Operation[any, any, any]) *OperationRegistry
NewOperationRegistry creates a new OperationRegistry with the provided untyped operations.
func (OperationRegistry) Retrieve ¶ added in v0.13.0
func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], error)
Retrieve retrieves an operation from the store based on its definition. It returns an error if the operation is not found. The definition must match the operation's ID and version. Description of the definition is not used for retrieval, only ID and Version. This allows for simplicity in retrieving operations with the same ID and version only without having to provide the description. This is useful when definition has to be provided via manual input.
type RecentReporter ¶
type RecentReporter struct { Reporter // contains filtered or unexported fields }
RecentReporter is a wrapper around a Reporter that keeps track of the most recent reports. Useful when trying to get a list of reports that was recently added in a sequence. It is thread-safe and can be used in a multi-threaded environment.
func NewRecentMemoryReporter ¶
func NewRecentMemoryReporter(reporter Reporter) *RecentReporter
NewRecentMemoryReporter creates a new RecentReporter.
func (*RecentReporter) AddReport ¶
func (e *RecentReporter) AddReport(report Report[any, any]) error
AddReport adds a report to the recent reporter.
func (*RecentReporter) GetRecentReports ¶
func (e *RecentReporter) GetRecentReports() []Report[any, any]
GetRecentReports returns all the reports that was added since the construction of the RecentReporter.
type Report ¶
type Report[IN, OUT any] struct { ID string `json:"id"` Def Definition `json:"definition"` Output OUT `json:"output"` Input IN `json:"input"` Timestamp *time.Time `json:"timestamp"` Err *ReportError `json:"error"` // stores a list of report ID for an operation that was executed as part of a sequence. ChildOperationReports []string `json:"childOperationReports"` // ExecutionSeries is used to track the execution of an operation that was executed multiple times ExecutionSeries *ExecutionSeries `json:"executionSeries,omitempty"` }
Report is the result of an operation. It contains the inputs and other metadata that was used to execute the operation.
func ExecuteOperation ¶
func ExecuteOperation[IN, OUT, DEP any]( b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, opts ...ExecuteOption[IN, DEP], ) (Report[IN, OUT], error)
ExecuteOperation executes an operation with the given input and dependencies. Execution will return the previous successful execution result and skip execution if there was a previous successful run found in the Reports. If previous unsuccessful execution was found, the execution will not be skipped.
Note: Operations that were skipped will not be added to the reporter.
Retry: By default, it retries the operation up to 10 times with exponential backoff if it fails. Use WithRetryConfig to customize the retry behavior. To cancel the retry early, return an error with NewUnrecoverableError.
Input & Output: The input and output must be JSON serializable. If the input is not serializable, it will return an error. To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. IsSerializable can be used to check if the input or output is serializable.
func ExecuteOperationN ¶ added in v0.12.0
func ExecuteOperationN[IN, OUT, DEP any]( b Bundle, operation *Operation[IN, OUT, DEP], deps DEP, input IN, seriesID string, n uint, opts ...ExecuteOption[IN, DEP], ) ([]Report[IN, OUT], error)
ExecuteOperationN executes the given operation multiple n times with the given input and dependencies. Execution will return the previous successful execution results and skip execution if there were previous successful runs found in the Reports. executionSeriesID is used to identify the multiple executions as a single unit. It is important to use a unique executionSeriesID for different sets of multiple executions.
type ReportError ¶
type ReportError struct {
Message string `json:"message"`
}
ReportError represents an error in the Report. Its purpose is to have an exported field `Message` for marshalling as the native error cant be marshaled to JSON.
func (ReportError) Error ¶
func (o ReportError) Error() string
Error implements the error interface.
type Reporter ¶
type Reporter interface { GetReport(id string) (Report[any, any], error) GetReports() ([]Report[any, any], error) AddReport(report Report[any, any]) error GetExecutionReports(reportID string) ([]Report[any, any], error) }
Reporter manages reports. It can store them in memory, in the FS, etc.
type RetryConfig ¶
type RetryConfig[IN, DEP any] struct { // Enabled determines if the retry is enabled for the operation. Enabled bool // Policy is the retry policy to control the behavior of the retry. Policy RetryPolicy // InputHook is a function that returns an updated input before retrying the operation. // The operation when retried will use the input returned by this function. // This is useful for scenarios like updating the gas limit. InputHook func(attempt uint, err error, input IN, deps DEP) IN }
type RetryPolicy ¶ added in v0.0.14
type RetryPolicy struct {
MaxAttempts uint
}
RetryPolicy defines the arguments to control the retry behavior.
type Sequence ¶
type Sequence[IN, OUT, DEP any] struct { // contains filtered or unexported fields }
func NewSequence ¶
func NewSequence[IN, OUT, DEP any]( id string, version *semver.Version, description string, handler SequenceHandler[IN, OUT, DEP], ) *Sequence[IN, OUT, DEP]
NewSequence creates a new sequence. Useful for logically grouping a list of operations and also child sequence together.
func (*Sequence[IN, OUT, DEP]) Description ¶
Description returns the sequence description.
type SequenceHandler ¶
SequenceHandler is the function signature of a sequence handler. A sequence handler is a function that calls 1 or more operations or child sequence.
type SequenceReport ¶
type SequenceReport[IN, OUT any] struct { Report[IN, OUT] // ExecutionReports is a list of report all the operations & sequence that was executed as part of this sequence. ExecutionReports []Report[any, any] }
SequenceReport is a report for a sequence. It contains a report for the sequence itself and also a list of reports for all the operations executed as part of the sequence. The latter is useful when we want to return all the reports of the operations executed as part of the sequence in changeset output.
func ExecuteSequence ¶
func ExecuteSequence[IN, OUT, DEP any]( b Bundle, sequence *Sequence[IN, OUT, DEP], deps DEP, input IN, ) (SequenceReport[IN, OUT], error)
ExecuteSequence executes a Sequence and returns a SequenceReport. The SequenceReport contains a report for the Sequence and also the execution reports which are all the operations that were executed as part of this sequence. The latter is useful when we want to return all the executed reports to the changeset output. Execution will return the previous successful execution result and skip execution if there was a previous successful run found in the Reports. If previous unsuccessful execution was found, the execution will not be skipped.
Note: Sequences or Operations that were skipped will not be added to the reporter. The ExecutionReports do not include Sequences or Operations that were skipped.
Input & Output: The input and output must be JSON serializable. If the input is not serializable, it will return an error. To be serializable, the input and output must be json.marshalable, or it must implement json.Marshaler and json.Unmarshaler. IsSerializable can be used to check if the input or output is serializable.
func (SequenceReport[IN, OUT]) ToGenericSequenceReport ¶
func (r SequenceReport[IN, OUT]) ToGenericSequenceReport() SequenceReport[any, any]
ToGenericSequenceReport converts the SequenceReport to a generic SequenceReport. This is useful when we want to return the report as a generic type in the changeset.output.