Documentation
¶
Index ¶
- Constants
- Variables
- func DeterministicTaskID(namespace, runID, stepID, taskName string) string
- func DeterministicVolumeID(namespace, context string) string
- func ExecutePipeline(ctx context.Context, content string, driverDSN string, store storage.Driver, ...) error
- func PipelineID(name, content string) string
- func TranspileAndValidate(source string) (string, error)
- func UniqueID() string
- type Assert
- func (a *Assert) ContainsElement(array []interface{}, element interface{}, message ...string)
- func (a *Assert) ContainsString(str, substr string, message ...string)
- func (a *Assert) Equal(expected, actual interface{}, message ...string)
- func (a *Assert) NotEqual(expected, actual interface{}, message ...string)
- func (a *Assert) Truthy(value bool, message ...string)
- type ExecuteOptions
- type ExecutorOptions
- type JS
- type NativeResourceInfo
- type Notifier
- func (n *Notifier) GetConfig(name string) (NotifyConfig, bool)
- func (n *Notifier) RenderTemplate(templateStr string) (string, error)
- func (n *Notifier) Send(ctx context.Context, name string, message string) error
- func (n *Notifier) SetConfigs(configs map[string]NotifyConfig)
- func (n *Notifier) SetContext(ctx NotifyContext)
- func (n *Notifier) UpdateContext(updates func(*NotifyContext))
- type NotifyConfig
- type NotifyContext
- type NotifyInput
- type NotifyResult
- type NotifyRuntime
- func (nr *NotifyRuntime) Send(input NotifyInput) *goja.Promise
- func (nr *NotifyRuntime) SendMultiple(names []string, message string, async bool) *goja.Promise
- func (nr *NotifyRuntime) SetConfigs(configs map[string]NotifyConfig)
- func (nr *NotifyRuntime) SetContext(ctx NotifyContext)
- func (nr *NotifyRuntime) UpdateJobName(jobName string)
- func (nr *NotifyRuntime) UpdateStatus(status string)
- type OutputCallback
- type PipelineRunner
- type PipelineState
- type ResourceCheckInput
- type ResourceCheckResult
- type ResourceFetchInput
- type ResourceFetchResult
- type ResourcePushInput
- type ResourcePushResult
- type ResourceRunner
- func (r *ResourceRunner) Check(input ResourceCheckInput) (*ResourceCheckResult, error)
- func (r *ResourceRunner) Fetch(input ResourceFetchInput) (*ResourceFetchResult, error)
- func (r *ResourceRunner) IsNative(resourceType string) bool
- func (r *ResourceRunner) ListNativeResources() []string
- func (r *ResourceRunner) Push(input ResourcePushInput) (*ResourcePushResult, error)
- type ResumableRunner
- type ResumeOptions
- type RunInput
- type RunResult
- type RunStatus
- type Runner
- type Runtime
- type StepState
- type StepStatus
- type VolumeInput
- type VolumeResult
- type YAML
Constants ¶
const ( MaxDepth = 10 SpacingMargin = 100 )
MaxDepth defines the maximum depth for spew dumping.
Variables ¶
var ( ErrPipelineNotFunction = errors.New("pipeline is not a function") ErrPipelineReturnedNonPromise = errors.New("pipeline did not return a promise") ErrPromiseRejected = errors.New("promise rejected") )
var ErrAssertion = errors.New("assertion failed")
Functions ¶
func DeterministicTaskID ¶
DeterministicTaskID generates a deterministic task ID based on the execution context. Includes namespace and runID to prevent collisions across different pipeline runs. The stepID provides uniqueness within a run, and taskName provides semantic clarity.
Returns an 8-character hexadecimal string.
func DeterministicVolumeID ¶
DeterministicVolumeID generates a deterministic volume ID for unnamed volumes. Uses namespace and a context string to ensure unique but reproducible names.
Returns an 8-character hexadecimal string.
func ExecutePipeline ¶
func ExecutePipeline( ctx context.Context, content string, driverDSN string, store storage.Driver, logger *slog.Logger, opts ExecutorOptions, ) error
ExecutePipeline executes a pipeline with the given content and driver DSN. It handles driver initialization, execution, and cleanup.
func PipelineID ¶
PipelineID generates a deterministic pipeline ID based on pipeline name and content. This enables caching and deduplication - identical pipelines will have identical IDs. Both name and content are included so different pipelines with the same content get unique IDs.
Returns a 32-character hexadecimal string.
func TranspileAndValidate ¶
TranspileAndValidate transpiles TypeScript/JavaScript source code to executable JavaScript. It performs esbuild transpilation, wraps the code for module exports, and validates the result can be compiled by goja. Returns the ready-to-execute code or an error.
Types ¶
type Assert ¶
type Assert struct {
// contains filtered or unexported fields
}
func (*Assert) ContainsElement ¶
func (*Assert) ContainsString ¶
type ExecuteOptions ¶
type ExecuteOptions struct {
// Resume enables resume mode for the pipeline.
Resume bool
// RunID is the unique identifier for this pipeline run.
// If resuming, this should match the previous run's ID.
RunID string
// PipelineID is the unique identifier for this pipeline.
// Used to scope resource versions to a specific pipeline.
PipelineID string
// Namespace is the namespace for this execution.
Namespace string
}
ExecuteOptions configures pipeline execution.
type ExecutorOptions ¶
type ExecutorOptions struct {
// Resume enables resume mode for the pipeline.
Resume bool
// RunID is the unique identifier for this pipeline run.
// If resuming, this should match the previous run's ID.
RunID string
// PipelineID is the unique identifier for this pipeline.
// Used to scope resource versions to a specific pipeline.
PipelineID string
// Namespace is the namespace for this execution (internal use).
Namespace string
}
ExecutorOptions configures pipeline execution.
type JS ¶
type JS struct {
// contains filtered or unexported fields
}
type NativeResourceInfo ¶
type NativeResourceInfo struct {
Request json.RawMessage `json:"request"`
Response json.RawMessage `json:"response"`
}
NativeResourceInfo holds information about resource execution for JSON serialization.
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier handles notification sending with configuration management.
func NewNotifier ¶
NewNotifier creates a new Notifier instance.
func (*Notifier) GetConfig ¶
func (n *Notifier) GetConfig(name string) (NotifyConfig, bool)
GetConfig returns a notification config by name.
func (*Notifier) RenderTemplate ¶
RenderTemplate renders a Go template string with the current context using Sprig functions.
func (*Notifier) SetConfigs ¶
func (n *Notifier) SetConfigs(configs map[string]NotifyConfig)
SetConfigs sets the notification configurations.
func (*Notifier) SetContext ¶
func (n *Notifier) SetContext(ctx NotifyContext)
SetContext sets the current pipeline context for template rendering.
func (*Notifier) UpdateContext ¶
func (n *Notifier) UpdateContext(updates func(*NotifyContext))
UpdateContext updates specific fields of the context.
type NotifyConfig ¶
type NotifyConfig struct {
Type string `json:"type"` // slack, teams, http
Token string `json:"token"` // For Slack
Webhook string `json:"webhook"` // For Teams
URL string `json:"url"` // For HTTP
Channels []string `json:"channels"` // For Slack
Headers map[string]string `json:"headers"` // For HTTP
Method string `json:"method"` // For HTTP (defaults to POST)
Recipients []string `json:"recipients"` // Generic recipients
}
NotifyConfig represents the configuration for a notification backend.
type NotifyContext ¶
type NotifyContext struct {
PipelineName string `json:"pipelineName"`
JobName string `json:"jobName"`
BuildID string `json:"buildID"`
Status string `json:"status"` // pending, running, success, failure, error
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Duration string `json:"duration"`
Environment map[string]string `json:"environment"`
TaskResults map[string]any `json:"taskResults"`
}
NotifyContext provides metadata about the current pipeline execution for template rendering.
type NotifyInput ¶
type NotifyInput struct {
Name string `json:"name"` // Config name (for named configs)
Message string `json:"message"` // Template message
Async bool `json:"async"` // Fire-and-forget mode
}
NotifyInput is the input for sending a notification from JavaScript.
type NotifyResult ¶
NotifyResult is the result of a notification attempt.
type NotifyRuntime ¶
type NotifyRuntime struct {
// contains filtered or unexported fields
}
NotifyRuntime wraps Notifier for use in Goja VM.
func NewNotifyRuntime ¶
func NewNotifyRuntime( ctx context.Context, jsVM *goja.Runtime, notifier *Notifier, promises *sync.WaitGroup, tasks chan func() error, ) *NotifyRuntime
NewNotifyRuntime creates a NotifyRuntime for Goja integration.
func (*NotifyRuntime) Send ¶
func (nr *NotifyRuntime) Send(input NotifyInput) *goja.Promise
Send sends a notification synchronously (returns a Promise).
func (*NotifyRuntime) SendMultiple ¶
SendMultiple sends to multiple notification configs.
func (*NotifyRuntime) SetConfigs ¶
func (nr *NotifyRuntime) SetConfigs(configs map[string]NotifyConfig)
SetConfigs sets notification configurations from JavaScript.
func (*NotifyRuntime) SetContext ¶
func (nr *NotifyRuntime) SetContext(ctx NotifyContext)
SetContext sets the pipeline context from JavaScript.
func (*NotifyRuntime) UpdateJobName ¶
func (nr *NotifyRuntime) UpdateJobName(jobName string)
UpdateJobName updates the job name in the current context.
func (*NotifyRuntime) UpdateStatus ¶
func (nr *NotifyRuntime) UpdateStatus(status string)
UpdateStatus updates the status in the current context.
type OutputCallback ¶
OutputCallback is called with streaming output chunks. stream is either "stdout" or "stderr", data is the output chunk.
type PipelineRunner ¶
type PipelineRunner struct {
// contains filtered or unexported fields
}
func NewPipelineRunner ¶
func (*PipelineRunner) CleanupVolumes ¶
func (c *PipelineRunner) CleanupVolumes() error
CleanupVolumes cleans up all tracked volumes. This triggers cache persistence for CachingVolume wrappers.
func (*PipelineRunner) CreateVolume ¶
func (c *PipelineRunner) CreateVolume(input VolumeInput) (*VolumeResult, error)
type PipelineState ¶
type PipelineState struct {
// RunID is a unique identifier for this pipeline run.
RunID string `json:"run_id"`
// Steps contains the state of each step, keyed by step ID.
Steps map[string]*StepState `json:"steps"`
// StepOrder maintains the order in which steps were created.
StepOrder []string `json:"step_order"`
// StartedAt is when the pipeline run started.
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when the pipeline run finished.
CompletedAt *time.Time `json:"completed_at,omitempty"`
// ResumeEnabled indicates if resumability was enabled for this run.
ResumeEnabled bool `json:"resume_enabled"`
}
PipelineState represents the persisted state of an entire pipeline run.
func NewPipelineState ¶
func NewPipelineState(runID string, resumeEnabled bool) *PipelineState
NewPipelineState creates a new pipeline state for a run.
func (*PipelineState) GetStep ¶
func (p *PipelineState) GetStep(stepID string) *StepState
GetStep returns the step state for a given step ID, or nil if not found.
func (*PipelineState) InProgressSteps ¶
func (p *PipelineState) InProgressSteps() []*StepState
InProgressSteps returns all steps that are currently running.
func (*PipelineState) LastStep ¶
func (p *PipelineState) LastStep() *StepState
LastStep returns the last step in execution order, or nil if no steps.
func (*PipelineState) SetStep ¶
func (p *PipelineState) SetStep(state *StepState)
SetStep adds or updates a step state.
type ResourceCheckInput ¶
type ResourceCheckInput struct {
Type string `json:"type"`
Source map[string]interface{} `json:"source"`
Version map[string]string `json:"version,omitempty"`
}
ResourceCheckInput is the input for a Check operation from JS.
type ResourceCheckResult ¶
ResourceCheckResult is the result of a Check operation.
type ResourceFetchInput ¶
type ResourceFetchInput struct {
Type string `json:"type"`
Source map[string]interface{} `json:"source"`
Version map[string]string `json:"version"`
Params map[string]interface{} `json:"params,omitempty"`
DestDir string `json:"destDir"`
}
ResourceFetchInput is the input for a Fetch operation from JS.
type ResourceFetchResult ¶
type ResourceFetchResult struct {
Version map[string]string `json:"version"`
Metadata []struct {
Name string `json:"name"`
Value string `json:"value"`
} `json:"metadata"`
}
ResourceFetchResult is the result of a Fetch operation.
type ResourcePushInput ¶
type ResourcePushInput struct {
Type string `json:"type"`
Source map[string]interface{} `json:"source"`
Params map[string]interface{} `json:"params,omitempty"`
SrcDir string `json:"srcDir"`
}
ResourcePushInput is the input for a Push operation from JS.
type ResourcePushResult ¶
type ResourcePushResult struct {
Version map[string]string `json:"version"`
Metadata []struct {
Name string `json:"name"`
Value string `json:"value"`
} `json:"metadata"`
}
ResourcePushResult is the result of a Push operation.
type ResourceRunner ¶
type ResourceRunner struct {
// contains filtered or unexported fields
}
ResourceRunner provides methods for executing native resources.
func NewResourceRunner ¶
func NewResourceRunner(ctx context.Context, logger *slog.Logger) *ResourceRunner
NewResourceRunner creates a new ResourceRunner.
func (*ResourceRunner) Check ¶
func (r *ResourceRunner) Check(input ResourceCheckInput) (*ResourceCheckResult, error)
Check discovers new versions of a resource.
func (*ResourceRunner) Fetch ¶
func (r *ResourceRunner) Fetch(input ResourceFetchInput) (*ResourceFetchResult, error)
Fetch retrieves a specific version of a resource (equivalent to 'in' or 'get').
func (*ResourceRunner) IsNative ¶
func (r *ResourceRunner) IsNative(resourceType string) bool
IsNative returns true if the given resource type is a native resource.
func (*ResourceRunner) ListNativeResources ¶
func (r *ResourceRunner) ListNativeResources() []string
ListNativeResources returns a list of all registered native resource types.
func (*ResourceRunner) Push ¶
func (r *ResourceRunner) Push(input ResourcePushInput) (*ResourcePushResult, error)
Push publishes a new version of a resource (equivalent to 'out' or 'put').
type ResumableRunner ¶
type ResumableRunner struct {
// contains filtered or unexported fields
}
ResumableRunner wraps PipelineRunner with state persistence and resume capability.
func NewResumableRunner ¶
func NewResumableRunner( ctx context.Context, client orchestra.Driver, store storagelib.Driver, logger *slog.Logger, namespace string, opts ResumeOptions, ) (*ResumableRunner, error)
NewResumableRunner creates a new resumable runner.
func (*ResumableRunner) CleanupVolumes ¶
func (r *ResumableRunner) CleanupVolumes() error
CleanupVolumes cleans up all tracked volumes (passthrough to underlying runner).
func (*ResumableRunner) CreateVolume ¶
func (r *ResumableRunner) CreateVolume(input VolumeInput) (*VolumeResult, error)
CreateVolume creates a volume (passthrough to underlying runner).
func (*ResumableRunner) Run ¶
func (r *ResumableRunner) Run(input RunInput) (*RunResult, error)
Run executes a task with resume support. If the task was previously completed, returns the cached result. If the task was in progress and the container is still running, reattaches. Otherwise, starts a new container.
func (*ResumableRunner) State ¶
func (r *ResumableRunner) State() *PipelineState
State returns the current pipeline state.
type ResumeOptions ¶
type ResumeOptions struct {
// RunID is the unique identifier for this pipeline run.
// If resuming, this should match the previous run's ID.
RunID string
// Resume indicates whether to attempt resuming a previous run.
Resume bool
}
ResumeOptions configures resume behavior.
type RunInput ¶
type RunInput struct {
Command struct {
Path string `json:"path"`
Args []string `json:"args"`
User string `json:"user"`
} `json:"command"`
ContainerLimits struct {
CPU int64 `json:"cpu"`
Memory int64 `json:"memory"`
} `json:"container_limits"`
Env map[string]string `json:"env"`
Image string `json:"image"`
Mounts map[string]VolumeResult `json:"mounts"`
Name string `json:"name"`
Privileged bool `json:"privileged"`
Stdin string `json:"stdin"`
// OnOutput is called with streaming output chunks as the container runs.
// If provided, the callback receives (stream, data) where stream is "stdout" or "stderr".
OnOutput OutputCallback `json:"-"` // Not serialized from JS, set programmatically
// has to be string because goja doesn't support string -> time.Duration
Timeout string `json:"timeout"`
}
type Runner ¶
type Runner interface {
Run(input RunInput) (*RunResult, error)
CreateVolume(input VolumeInput) (*VolumeResult, error)
CleanupVolumes() error
}
Runner is the interface for running pipeline steps. Both PipelineRunner and ResumableRunner implement this interface.
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
func NewRuntime ¶
func (*Runtime) CreateVolume ¶
func (r *Runtime) CreateVolume(input VolumeInput) *goja.Promise
type StepState ¶
type StepState struct {
// StepID is a unique identifier for this step within the pipeline run.
StepID string `json:"step_id"`
// Name is the human-readable name of the step.
Name string `json:"name"`
// Status is the current execution status.
Status StepStatus `json:"status"`
// ContainerID is the driver-specific container identifier (for reattachment).
ContainerID string `json:"container_id,omitempty"`
// TaskID is the task identifier used by the orchestrator.
TaskID string `json:"task_id,omitempty"`
// StartedAt is when the step started executing.
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when the step finished (successfully or not).
CompletedAt *time.Time `json:"completed_at,omitempty"`
// ExitCode is the container exit code (if completed).
ExitCode *int `json:"exit_code,omitempty"`
// Result stores the serialized step result for completed steps.
Result *RunResult `json:"result,omitempty"`
// Error stores the error message if the step failed.
Error string `json:"error,omitempty"`
}
StepState represents the persisted state of a pipeline step.
func (*StepState) CanSkip ¶
CanSkip returns true if the step was already completed successfully and can be skipped on resume.
func (*StepState) IsResumable ¶
IsResumable returns true if the step can be resumed (running state with a container ID).
func (*StepState) IsTerminal ¶
IsTerminal returns true if the step is in a terminal state (completed, failed, or aborted).
type StepStatus ¶
type StepStatus string
StepStatus represents the execution status of a pipeline step.
const ( // StepStatusPending indicates the step has not started yet. StepStatusPending StepStatus = "pending" // StepStatusRunning indicates the step is currently executing. StepStatusRunning StepStatus = "running" // StepStatusCompleted indicates the step finished successfully. StepStatusCompleted StepStatus = "completed" // StepStatusFailed indicates the step finished with an error. StepStatusFailed StepStatus = "failed" // StepStatusAborted indicates the step was interrupted/aborted. StepStatusAborted StepStatus = "aborted" )