runtime

package
v0.0.0-...-31e5321 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxDepth      = 10
	SpacingMargin = 100
)

MaxDepth defines the maximum depth for spew dumping.

Variables

View Source
var (
	ErrPipelineNotFunction        = errors.New("pipeline is not a function")
	ErrPipelineReturnedNonPromise = errors.New("pipeline did not return a promise")
	ErrPromiseRejected            = errors.New("promise rejected")
)
View Source
var ErrAssertion = errors.New("assertion failed")

Functions

func DeterministicTaskID

func DeterministicTaskID(namespace, runID, stepID, taskName string) string

DeterministicTaskID generates a deterministic task ID based on the execution context. Includes namespace and runID to prevent collisions across different pipeline runs. The stepID provides uniqueness within a run, and taskName provides semantic clarity.

Returns an 8-character hexadecimal string.

func DeterministicVolumeID

func DeterministicVolumeID(namespace, context string) string

DeterministicVolumeID generates a deterministic volume ID for unnamed volumes. Uses namespace and a context string to ensure unique but reproducible names.

Returns an 8-character hexadecimal string.

func ExecutePipeline

func ExecutePipeline(
	ctx context.Context,
	content string,
	driverDSN string,
	store storage.Driver,
	logger *slog.Logger,
	opts ExecutorOptions,
) error

ExecutePipeline executes a pipeline with the given content and driver DSN. It handles driver initialization, execution, and cleanup.

func PipelineID

func PipelineID(name, content string) string

PipelineID generates a deterministic pipeline ID based on pipeline name and content. This enables caching and deduplication - identical pipelines will have identical IDs. Both name and content are included so different pipelines with the same content get unique IDs.

Returns a 32-character hexadecimal string.

func TranspileAndValidate

func TranspileAndValidate(source string) (string, error)

TranspileAndValidate transpiles TypeScript/JavaScript source code to executable JavaScript. It performs esbuild transpilation, wraps the code for module exports, and validates the result can be compiled by goja. Returns the ready-to-execute code or an error.

func UniqueID

func UniqueID() string

UniqueID generates a random unique identifier for things that should not be deterministic (e.g., run IDs, namespaces for fresh runs). This is a wrapper around gonanoid to consolidate random ID generation.

Types

type Assert

type Assert struct {
	// contains filtered or unexported fields
}

func NewAssert

func NewAssert(vm *goja.Runtime, logger *slog.Logger) *Assert

func (*Assert) ContainsElement

func (a *Assert) ContainsElement(array []interface{}, element interface{}, message ...string)

func (*Assert) ContainsString

func (a *Assert) ContainsString(str, substr string, message ...string)

func (*Assert) Equal

func (a *Assert) Equal(expected, actual interface{}, message ...string)

func (*Assert) NotEqual

func (a *Assert) NotEqual(expected, actual interface{}, message ...string)

func (*Assert) Truthy

func (a *Assert) Truthy(value bool, message ...string)

type ExecuteOptions

type ExecuteOptions struct {
	// Resume enables resume mode for the pipeline.
	Resume bool
	// RunID is the unique identifier for this pipeline run.
	// If resuming, this should match the previous run's ID.
	RunID string
	// PipelineID is the unique identifier for this pipeline.
	// Used to scope resource versions to a specific pipeline.
	PipelineID string
	// Namespace is the namespace for this execution.
	Namespace string
}

ExecuteOptions configures pipeline execution.

type ExecutorOptions

type ExecutorOptions struct {
	// Resume enables resume mode for the pipeline.
	Resume bool
	// RunID is the unique identifier for this pipeline run.
	// If resuming, this should match the previous run's ID.
	RunID string
	// PipelineID is the unique identifier for this pipeline.
	// Used to scope resource versions to a specific pipeline.
	PipelineID string
	// Namespace is the namespace for this execution (internal use).
	Namespace string
}

ExecutorOptions configures pipeline execution.

type JS

type JS struct {
	// contains filtered or unexported fields
}

func NewJS

func NewJS(logger *slog.Logger) *JS

func (*JS) Execute

func (j *JS) Execute(ctx context.Context, source string, driver orchestra.Driver, storage storage.Driver) error

Execute runs a pipeline with default options (no resume).

func (*JS) ExecuteWithOptions

func (j *JS) ExecuteWithOptions(ctx context.Context, source string, driver orchestra.Driver, storage storage.Driver, opts ExecuteOptions) error

ExecuteWithOptions runs a pipeline with the given options.

type NativeResourceInfo

type NativeResourceInfo struct {
	Request  json.RawMessage `json:"request"`
	Response json.RawMessage `json:"response"`
}

NativeResourceInfo holds information about resource execution for JSON serialization.

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier handles notification sending with configuration management.

func NewNotifier

func NewNotifier(logger *slog.Logger) *Notifier

NewNotifier creates a new Notifier instance.

func (*Notifier) GetConfig

func (n *Notifier) GetConfig(name string) (NotifyConfig, bool)

GetConfig returns a notification config by name.

func (*Notifier) RenderTemplate

func (n *Notifier) RenderTemplate(templateStr string) (string, error)

RenderTemplate renders a Go template string with the current context using Sprig functions.

func (*Notifier) Send

func (n *Notifier) Send(ctx context.Context, name string, message string) error

Send sends a notification using the named configuration.

func (*Notifier) SetConfigs

func (n *Notifier) SetConfigs(configs map[string]NotifyConfig)

SetConfigs sets the notification configurations.

func (*Notifier) SetContext

func (n *Notifier) SetContext(ctx NotifyContext)

SetContext sets the current pipeline context for template rendering.

func (*Notifier) UpdateContext

func (n *Notifier) UpdateContext(updates func(*NotifyContext))

UpdateContext updates specific fields of the context.

type NotifyConfig

type NotifyConfig struct {
	Type       string            `json:"type"`       // slack, teams, http
	Token      string            `json:"token"`      // For Slack
	Webhook    string            `json:"webhook"`    // For Teams
	URL        string            `json:"url"`        // For HTTP
	Channels   []string          `json:"channels"`   // For Slack
	Headers    map[string]string `json:"headers"`    // For HTTP
	Method     string            `json:"method"`     // For HTTP (defaults to POST)
	Recipients []string          `json:"recipients"` // Generic recipients
}

NotifyConfig represents the configuration for a notification backend.

type NotifyContext

type NotifyContext struct {
	PipelineName string            `json:"pipelineName"`
	JobName      string            `json:"jobName"`
	BuildID      string            `json:"buildID"`
	Status       string            `json:"status"` // pending, running, success, failure, error
	StartTime    string            `json:"startTime"`
	EndTime      string            `json:"endTime"`
	Duration     string            `json:"duration"`
	Environment  map[string]string `json:"environment"`
	TaskResults  map[string]any    `json:"taskResults"`
}

NotifyContext provides metadata about the current pipeline execution for template rendering.

type NotifyInput

type NotifyInput struct {
	Name    string `json:"name"`    // Config name (for named configs)
	Message string `json:"message"` // Template message
	Async   bool   `json:"async"`   // Fire-and-forget mode
}

NotifyInput is the input for sending a notification from JavaScript.

type NotifyResult

type NotifyResult struct {
	Success bool   `json:"success"`
	Error   string `json:"error,omitempty"`
}

NotifyResult is the result of a notification attempt.

type NotifyRuntime

type NotifyRuntime struct {
	// contains filtered or unexported fields
}

NotifyRuntime wraps Notifier for use in Goja VM.

func NewNotifyRuntime

func NewNotifyRuntime(
	ctx context.Context,
	jsVM *goja.Runtime,
	notifier *Notifier,
	promises *sync.WaitGroup,
	tasks chan func() error,
) *NotifyRuntime

NewNotifyRuntime creates a NotifyRuntime for Goja integration.

func (*NotifyRuntime) Send

func (nr *NotifyRuntime) Send(input NotifyInput) *goja.Promise

Send sends a notification synchronously (returns a Promise).

func (*NotifyRuntime) SendMultiple

func (nr *NotifyRuntime) SendMultiple(names []string, message string, async bool) *goja.Promise

SendMultiple sends to multiple notification configs.

func (*NotifyRuntime) SetConfigs

func (nr *NotifyRuntime) SetConfigs(configs map[string]NotifyConfig)

SetConfigs sets notification configurations from JavaScript.

func (*NotifyRuntime) SetContext

func (nr *NotifyRuntime) SetContext(ctx NotifyContext)

SetContext sets the pipeline context from JavaScript.

func (*NotifyRuntime) UpdateJobName

func (nr *NotifyRuntime) UpdateJobName(jobName string)

UpdateJobName updates the job name in the current context.

func (*NotifyRuntime) UpdateStatus

func (nr *NotifyRuntime) UpdateStatus(status string)

UpdateStatus updates the status in the current context.

type OutputCallback

type OutputCallback func(stream string, data string)

OutputCallback is called with streaming output chunks. stream is either "stdout" or "stderr", data is the output chunk.

type PipelineRunner

type PipelineRunner struct {
	// contains filtered or unexported fields
}

func NewPipelineRunner

func NewPipelineRunner(
	ctx context.Context,
	client orchestra.Driver,
	storageClient storage.Driver,
	logger *slog.Logger,
	namespace string,
	runID string,
) *PipelineRunner

func (*PipelineRunner) CleanupVolumes

func (c *PipelineRunner) CleanupVolumes() error

CleanupVolumes cleans up all tracked volumes. This triggers cache persistence for CachingVolume wrappers.

func (*PipelineRunner) CreateVolume

func (c *PipelineRunner) CreateVolume(input VolumeInput) (*VolumeResult, error)

func (*PipelineRunner) Run

func (c *PipelineRunner) Run(input RunInput) (*RunResult, error)

type PipelineState

type PipelineState struct {
	// RunID is a unique identifier for this pipeline run.
	RunID string `json:"run_id"`
	// Steps contains the state of each step, keyed by step ID.
	Steps map[string]*StepState `json:"steps"`
	// StepOrder maintains the order in which steps were created.
	StepOrder []string `json:"step_order"`
	// StartedAt is when the pipeline run started.
	StartedAt *time.Time `json:"started_at,omitempty"`
	// CompletedAt is when the pipeline run finished.
	CompletedAt *time.Time `json:"completed_at,omitempty"`
	// ResumeEnabled indicates if resumability was enabled for this run.
	ResumeEnabled bool `json:"resume_enabled"`
}

PipelineState represents the persisted state of an entire pipeline run.

func NewPipelineState

func NewPipelineState(runID string, resumeEnabled bool) *PipelineState

NewPipelineState creates a new pipeline state for a run.

func (*PipelineState) GetStep

func (p *PipelineState) GetStep(stepID string) *StepState

GetStep returns the step state for a given step ID, or nil if not found.

func (*PipelineState) InProgressSteps

func (p *PipelineState) InProgressSteps() []*StepState

InProgressSteps returns all steps that are currently running.

func (*PipelineState) LastStep

func (p *PipelineState) LastStep() *StepState

LastStep returns the last step in execution order, or nil if no steps.

func (*PipelineState) SetStep

func (p *PipelineState) SetStep(state *StepState)

SetStep adds or updates a step state.

type ResourceCheckInput

type ResourceCheckInput struct {
	Type    string                 `json:"type"`
	Source  map[string]interface{} `json:"source"`
	Version map[string]string      `json:"version,omitempty"`
}

ResourceCheckInput is the input for a Check operation from JS.

type ResourceCheckResult

type ResourceCheckResult struct {
	Versions []map[string]string `json:"versions"`
}

ResourceCheckResult is the result of a Check operation.

type ResourceFetchInput

type ResourceFetchInput struct {
	Type    string                 `json:"type"`
	Source  map[string]interface{} `json:"source"`
	Version map[string]string      `json:"version"`
	Params  map[string]interface{} `json:"params,omitempty"`
	DestDir string                 `json:"destDir"`
}

ResourceFetchInput is the input for a Fetch operation from JS.

type ResourceFetchResult

type ResourceFetchResult struct {
	Version  map[string]string `json:"version"`
	Metadata []struct {
		Name  string `json:"name"`
		Value string `json:"value"`
	} `json:"metadata"`
}

ResourceFetchResult is the result of a Fetch operation.

type ResourcePushInput

type ResourcePushInput struct {
	Type   string                 `json:"type"`
	Source map[string]interface{} `json:"source"`
	Params map[string]interface{} `json:"params,omitempty"`
	SrcDir string                 `json:"srcDir"`
}

ResourcePushInput is the input for a Push operation from JS.

type ResourcePushResult

type ResourcePushResult struct {
	Version  map[string]string `json:"version"`
	Metadata []struct {
		Name  string `json:"name"`
		Value string `json:"value"`
	} `json:"metadata"`
}

ResourcePushResult is the result of a Push operation.

type ResourceRunner

type ResourceRunner struct {
	// contains filtered or unexported fields
}

ResourceRunner provides methods for executing native resources.

func NewResourceRunner

func NewResourceRunner(ctx context.Context, logger *slog.Logger) *ResourceRunner

NewResourceRunner creates a new ResourceRunner.

func (*ResourceRunner) Check

Check discovers new versions of a resource.

func (*ResourceRunner) Fetch

Fetch retrieves a specific version of a resource (equivalent to 'in' or 'get').

func (*ResourceRunner) IsNative

func (r *ResourceRunner) IsNative(resourceType string) bool

IsNative returns true if the given resource type is a native resource.

func (*ResourceRunner) ListNativeResources

func (r *ResourceRunner) ListNativeResources() []string

ListNativeResources returns a list of all registered native resource types.

func (*ResourceRunner) Push

Push publishes a new version of a resource (equivalent to 'out' or 'put').

type ResumableRunner

type ResumableRunner struct {
	// contains filtered or unexported fields
}

ResumableRunner wraps PipelineRunner with state persistence and resume capability.

func NewResumableRunner

func NewResumableRunner(
	ctx context.Context,
	client orchestra.Driver,
	store storagelib.Driver,
	logger *slog.Logger,
	namespace string,
	opts ResumeOptions,
) (*ResumableRunner, error)

NewResumableRunner creates a new resumable runner.

func (*ResumableRunner) CleanupVolumes

func (r *ResumableRunner) CleanupVolumes() error

CleanupVolumes cleans up all tracked volumes (passthrough to underlying runner).

func (*ResumableRunner) CreateVolume

func (r *ResumableRunner) CreateVolume(input VolumeInput) (*VolumeResult, error)

CreateVolume creates a volume (passthrough to underlying runner).

func (*ResumableRunner) Run

func (r *ResumableRunner) Run(input RunInput) (*RunResult, error)

Run executes a task with resume support. If the task was previously completed, returns the cached result. If the task was in progress and the container is still running, reattaches. Otherwise, starts a new container.

func (*ResumableRunner) State

func (r *ResumableRunner) State() *PipelineState

State returns the current pipeline state.

type ResumeOptions

type ResumeOptions struct {
	// RunID is the unique identifier for this pipeline run.
	// If resuming, this should match the previous run's ID.
	RunID string
	// Resume indicates whether to attempt resuming a previous run.
	Resume bool
}

ResumeOptions configures resume behavior.

type RunInput

type RunInput struct {
	Command struct {
		Path string   `json:"path"`
		Args []string `json:"args"`
		User string   `json:"user"`
	} `json:"command"`
	ContainerLimits struct {
		CPU    int64 `json:"cpu"`
		Memory int64 `json:"memory"`
	} `json:"container_limits"`
	Env        map[string]string       `json:"env"`
	Image      string                  `json:"image"`
	Mounts     map[string]VolumeResult `json:"mounts"`
	Name       string                  `json:"name"`
	Privileged bool                    `json:"privileged"`
	Stdin      string                  `json:"stdin"`
	// OnOutput is called with streaming output chunks as the container runs.
	// If provided, the callback receives (stream, data) where stream is "stdout" or "stderr".
	OnOutput OutputCallback `json:"-"` // Not serialized from JS, set programmatically
	// has to be string because goja doesn't support string -> time.Duration
	Timeout string `json:"timeout"`
}

type RunResult

type RunResult struct {
	Code   int    `json:"code"`
	Stderr string `json:"stderr"`
	Stdout string `json:"stdout"`

	Status RunStatus `json:"status"`
}

type RunStatus

type RunStatus string
const (
	RunAbort    RunStatus = "abort"
	RunComplete RunStatus = "complete"
)

type Runner

type Runner interface {
	Run(input RunInput) (*RunResult, error)
	CreateVolume(input VolumeInput) (*VolumeResult, error)
	CleanupVolumes() error
}

Runner is the interface for running pipeline steps. Both PipelineRunner and ResumableRunner implement this interface.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

func NewRuntime

func NewRuntime(
	jsVM *goja.Runtime,
	runner Runner,
	namespace string,
	runID string,
) *Runtime

func (*Runtime) CreateVolume

func (r *Runtime) CreateVolume(input VolumeInput) *goja.Promise

func (*Runtime) Run

func (r *Runtime) Run(call goja.FunctionCall) goja.Value

Run executes a container task. Accepts an object with optional onOutput callback.

func (*Runtime) Wait

func (r *Runtime) Wait() error

type StepState

type StepState struct {
	// StepID is a unique identifier for this step within the pipeline run.
	StepID string `json:"step_id"`
	// Name is the human-readable name of the step.
	Name string `json:"name"`
	// Status is the current execution status.
	Status StepStatus `json:"status"`
	// ContainerID is the driver-specific container identifier (for reattachment).
	ContainerID string `json:"container_id,omitempty"`
	// TaskID is the task identifier used by the orchestrator.
	TaskID string `json:"task_id,omitempty"`
	// StartedAt is when the step started executing.
	StartedAt *time.Time `json:"started_at,omitempty"`
	// CompletedAt is when the step finished (successfully or not).
	CompletedAt *time.Time `json:"completed_at,omitempty"`
	// ExitCode is the container exit code (if completed).
	ExitCode *int `json:"exit_code,omitempty"`
	// Result stores the serialized step result for completed steps.
	Result *RunResult `json:"result,omitempty"`
	// Error stores the error message if the step failed.
	Error string `json:"error,omitempty"`
}

StepState represents the persisted state of a pipeline step.

func (*StepState) CanSkip

func (s *StepState) CanSkip() bool

CanSkip returns true if the step was already completed successfully and can be skipped on resume.

func (*StepState) IsResumable

func (s *StepState) IsResumable() bool

IsResumable returns true if the step can be resumed (running state with a container ID).

func (*StepState) IsTerminal

func (s *StepState) IsTerminal() bool

IsTerminal returns true if the step is in a terminal state (completed, failed, or aborted).

type StepStatus

type StepStatus string

StepStatus represents the execution status of a pipeline step.

const (
	// StepStatusPending indicates the step has not started yet.
	StepStatusPending StepStatus = "pending"
	// StepStatusRunning indicates the step is currently executing.
	StepStatusRunning StepStatus = "running"
	// StepStatusCompleted indicates the step finished successfully.
	StepStatusCompleted StepStatus = "completed"
	// StepStatusFailed indicates the step finished with an error.
	StepStatusFailed StepStatus = "failed"
	// StepStatusAborted indicates the step was interrupted/aborted.
	StepStatusAborted StepStatus = "aborted"
)

type VolumeInput

type VolumeInput struct {
	Name string `json:"name"`
	Size int    `json:"size"`
}

type VolumeResult

type VolumeResult struct {
	Name string `json:"name"`
	Path string `json:"path"`
	// contains filtered or unexported fields
}

type YAML

type YAML struct {
	// contains filtered or unexported fields
}

func NewYAML

func NewYAML(vm *goja.Runtime, logger *slog.Logger) *YAML

func (*YAML) Parse

func (y *YAML) Parse(source string) interface{}

func (*YAML) Stringify

func (y *YAML) Stringify(payload interface{}) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL