Documentation
¶
Overview ¶
Package executor defines the Strategy pattern interface for workflow execution backends. Each execution engine (Tekton, K8s Job) implements this interface to handle the lifecycle of its execution resource (PipelineRun, Job).
Authority: BR-WE-014 (Kubernetes Job Execution Backend) Design: Strategy pattern dispatch based on spec.executionEngine
Index ¶
- Constants
- func BuildExtraVars(params map[string]string) map[string]interface{}
- func ExecutionResourceName(targetResource string) string
- type AWXClient
- type AWXHTTPClient
- func (c *AWXHTTPClient) CancelJob(ctx context.Context, jobID int) error
- func (c *AWXHTTPClient) CreateCredential(ctx context.Context, name string, credTypeID, orgID int, ...) (int, error)
- func (c *AWXHTTPClient) CreateCredentialType(ctx context.Context, name string, inputs, injectors map[string]interface{}) (int, error)
- func (c *AWXHTTPClient) DeleteCredential(ctx context.Context, credentialID int) error
- func (c *AWXHTTPClient) FindCredentialTypeByName(ctx context.Context, name string) (int, error)
- func (c *AWXHTTPClient) FindJobTemplateByName(ctx context.Context, name string) (int, error)
- func (c *AWXHTTPClient) GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)
- func (c *AWXHTTPClient) GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)
- func (c *AWXHTTPClient) LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)
- func (c *AWXHTTPClient) LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, ...) (int, error)
- type AWXJobStatus
- type AnsibleExecutor
- func (a *AnsibleExecutor) Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) error
- func (a *AnsibleExecutor) Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (string, error)
- func (a *AnsibleExecutor) Engine() string
- func (a *AnsibleExecutor) GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (*ExecutionResult, error)
- type CreateOptions
- type ExecutionResult
- type Executor
- type JobExecutor
- func (j *JobExecutor) Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) error
- func (j *JobExecutor) Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (string, error)
- func (j *JobExecutor) Engine() string
- func (j *JobExecutor) GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (*ExecutionResult, error)
- func (j *JobExecutor) IsCompleted(ctx context.Context, targetResource string, namespace string) (bool, error)
- type Registry
- type TektonExecutor
- func (t *TektonExecutor) Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) error
- func (t *TektonExecutor) Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (string, error)
- func (t *TektonExecutor) Engine() string
- func (t *TektonExecutor) GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, ...) (*ExecutionResult, error)
Constants ¶
const ( SecretMountBasePath = "/run/kubernaut/secrets" ConfigMapMountBasePath = "/run/kubernaut/configmaps" )
const DefaultServiceAccountName = "kubernaut-workflow-runner"
DefaultServiceAccountName is the default SA for PipelineRuns
Variables ¶
This section is empty.
Functions ¶
func BuildExtraVars ¶
BuildExtraVars converts workflow parameters (map[string]string) to typed JSON values for AWX extra_vars. Attempts type coercion: integers, booleans, floats, JSON arrays/objects are converted to their native types. Plain strings remain strings.
func ExecutionResourceName ¶
ExecutionResourceName generates a deterministic name from targetResource. DD-WE-003: Lock Persistence via Deterministic Name Format: wfe-<sha256(targetResource)[:16]>
Types ¶
type AWXClient ¶
type AWXClient interface {
LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)
GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)
CancelJob(ctx context.Context, jobID int) error
FindJobTemplateByName(ctx context.Context, name string) (int, error)
// Credential lifecycle for dependencies.secrets injection (BR-WE-015).
// The executor dynamically creates AWX credential types per unique K8s Secret name,
// then creates ephemeral credentials per WFE execution and cleans them up after completion.
CreateCredentialType(ctx context.Context, name string, inputs, injectors map[string]interface{}) (int, error)
FindCredentialTypeByName(ctx context.Context, name string) (int, error)
CreateCredential(ctx context.Context, name string, credTypeID, orgID int, inputs map[string]string) (int, error)
DeleteCredential(ctx context.Context, credentialID int) error
LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, credentialIDs []int) (int, error)
GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)
}
AWXClient defines the interface for AWX/AAP REST API operations. Mocked in unit tests; real implementation provided by AWXHTTPClient.
type AWXHTTPClient ¶
type AWXHTTPClient struct {
// contains filtered or unexported fields
}
AWXHTTPClient implements AWXClient using the AWX REST API.
func NewAWXHTTPClient ¶
func NewAWXHTTPClient(baseURL, token string, insecure bool) *AWXHTTPClient
NewAWXHTTPClient creates a new AWX REST API client.
func (*AWXHTTPClient) CancelJob ¶
func (c *AWXHTTPClient) CancelJob(ctx context.Context, jobID int) error
func (*AWXHTTPClient) CreateCredential ¶
func (*AWXHTTPClient) CreateCredentialType ¶
func (*AWXHTTPClient) DeleteCredential ¶
func (c *AWXHTTPClient) DeleteCredential(ctx context.Context, credentialID int) error
func (*AWXHTTPClient) FindCredentialTypeByName ¶
func (*AWXHTTPClient) FindJobTemplateByName ¶
func (*AWXHTTPClient) GetJobStatus ¶
func (c *AWXHTTPClient) GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)
func (*AWXHTTPClient) GetJobTemplateCredentials ¶ added in v1.1.0
func (*AWXHTTPClient) LaunchJobTemplate ¶
func (*AWXHTTPClient) LaunchJobTemplateWithCreds ¶
type AWXJobStatus ¶
type AWXJobStatus struct {
ID int `json:"id"`
Status string `json:"status"`
Failed bool `json:"failed"`
ResultStdout string `json:"result_stdout,omitempty"`
}
AWXJobStatus represents the status response from AWX GET /api/v2/jobs/{id}/
type AnsibleExecutor ¶
type AnsibleExecutor struct {
AWXClient AWXClient
K8sClient client.Client
OrganizationID int
Logger logr.Logger
}
AnsibleExecutor implements the Executor interface for AWX/AAP workflow execution. BR-WE-015: Launches AWX Job Templates and tracks execution status via the AWX REST API.
func NewAnsibleExecutor ¶
func NewAnsibleExecutor(awxClient AWXClient, k8sClient client.Client, orgID int, logger logr.Logger) *AnsibleExecutor
NewAnsibleExecutor creates a new AnsibleExecutor with the given AWX client. k8sClient is used to read K8s Secrets (dependencies.secrets) and update WFE status. orgID is the AWX organization ID for ephemeral credential creation.
func (*AnsibleExecutor) Cleanup ¶
func (a *AnsibleExecutor) Cleanup( ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, ) error
Cleanup deletes ephemeral AWX credentials (if any) and cancels the AWX job.
func (*AnsibleExecutor) Create ¶
func (a *AnsibleExecutor) Create( ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions, ) (string, error)
Create launches an AWX Job Template from the WFE spec. DD-WE-006: When opts.Dependencies contains secrets, the executor reads them from Kubernetes, creates ephemeral AWX credentials, and attaches them to the job launch.
func (*AnsibleExecutor) Engine ¶
func (a *AnsibleExecutor) Engine() string
Engine returns "ansible".
func (*AnsibleExecutor) GetStatus ¶
func (a *AnsibleExecutor) GetStatus( ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, ) (*ExecutionResult, error)
GetStatus polls AWX for the job status and maps it to an ExecutionResult.
type CreateOptions ¶
type CreateOptions struct {
Dependencies *models.WorkflowDependencies
}
CreateOptions carries optional configuration for execution resource creation. Using a struct allows adding new fields without breaking the interface. DD-WE-006: Dependencies are passed here, queried from DS by the reconciler.
type ExecutionResult ¶
type ExecutionResult struct {
// Phase maps to WFE phase constants (Pending, Running, Completed, Failed)
Phase string
// Reason is a machine-readable reason for the current phase
Reason string
// Message is a human-readable description of the current state
Message string
// Summary contains the lightweight execution status for WFE status field
Summary *workflowexecutionv1alpha1.ExecutionStatusSummary
}
ExecutionResult represents the mapped status of an execution resource. Both Tekton PipelineRun conditions and K8s Job conditions are mapped to this common structure.
func MapAWXStatusToResult ¶
func MapAWXStatusToResult(status *AWXJobStatus) *ExecutionResult
MapAWXStatusToResult maps an AWX job status to an ExecutionResult. AWX states: pending, waiting, running, successful, failed, error, canceled
type Executor ¶
type Executor interface {
// Create builds and creates the execution resource (PipelineRun or Job)
// in the specified execution namespace. Returns the name of the created resource.
//
// The execution resource name MUST be deterministic based on targetResource
// to provide atomic resource locking (DD-WE-003).
//
// DD-WE-006: opts.Dependencies carries schema-declared infrastructure dependencies
// to be mounted as volumes (Job) or workspace bindings (Tekton).
Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions) (string, error)
// GetStatus retrieves the current status of the execution resource.
// Returns an ExecutionResult that maps the backend-specific status to WFE phases.
//
// Returns nil result with nil error if the execution resource is not found
// (may have been externally deleted - BR-WE-007).
GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) (*ExecutionResult, error)
// Cleanup deletes the execution resource during WFE deletion (finalizer).
// Returns nil if the resource doesn't exist (idempotent).
Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) error
// Engine returns the execution engine identifier ("tekton" or "job")
Engine() string
}
Executor defines the interface for workflow execution backends. Both TektonExecutor and JobExecutor implement this interface.
Lifecycle:
- Create - Creates the execution resource (PipelineRun or Job) in the execution namespace
- GetStatus - Polls the execution resource for current status
- Cleanup - Deletes the execution resource during WFE deletion
type JobExecutor ¶
JobExecutor implements the Executor interface for Kubernetes Jobs. Used for single-step remediations that don't require Tekton pipeline machinery.
Authority: BR-WE-014 (Kubernetes Job Execution Backend)
func NewJobExecutor ¶
func NewJobExecutor(c client.Client, serviceAccountName string) *JobExecutor
NewJobExecutor creates a new JobExecutor.
func (*JobExecutor) Cleanup ¶
func (j *JobExecutor) Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) error
Cleanup deletes the Job in the execution namespace. Returns nil if the Job doesn't exist (idempotent).
Issue #383: Before deleting, verify the Job's kubernaut.ai/workflow-execution label matches this WFE's name. Because the Job name is deterministic (derived from TargetResource), a newer WFE for the same target may have already replaced the Job. Deleting without this check would destroy the new WFE's Job.
func (*JobExecutor) Create ¶
func (j *JobExecutor) Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions) (string, error)
Create builds and creates a Kubernetes Job in the execution namespace. Returns the name of the created Job.
The Job runs the container image from the workflow catalog with parameters injected as environment variables. DD-WE-006: opts.Dependencies are mounted as volumes at /run/kubernaut/secrets/<name> and /run/kubernaut/configmaps/<name>.
func (*JobExecutor) GetStatus ¶
func (j *JobExecutor) GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) (*ExecutionResult, error)
GetStatus retrieves the current status of the Job and maps it to ExecutionResult. Returns nil result with nil error if the Job is not found.
Job condition mapping:
- conditions[Complete]=True → PhaseCompleted
- conditions[Failed]=True → PhaseFailed
- no terminal condition → PhaseRunning
func (*JobExecutor) IsCompleted ¶ added in v1.1.0
func (j *JobExecutor) IsCompleted(ctx context.Context, targetResource string, namespace string) (bool, error)
IsCompleted checks whether the existing Job for the given target resource is in a terminal state (Succeeded or Failed). Used by the controller to determine if a stale completed Job can be cleaned up before retrying creation (Issue #374).
Returns (true, nil) if the Job has a terminal condition (JobComplete or JobFailed). Returns (false, nil) if the Job is still running (no terminal condition). Returns (false, err) if the Job cannot be fetched (e.g., NotFound race).
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps execution engine names to Executor implementations. Used by the controller to dispatch to the correct executor.
type TektonExecutor ¶
TektonExecutor implements the Executor interface for Tekton PipelineRuns. Extracted from WorkflowExecutionReconciler (BR-WE-014).
func NewTektonExecutor ¶
func NewTektonExecutor(c client.Client, serviceAccountName string) *TektonExecutor
NewTektonExecutor creates a new TektonExecutor.
func (*TektonExecutor) Cleanup ¶
func (t *TektonExecutor) Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) error
Cleanup deletes the PipelineRun in the execution namespace. Returns nil if the PipelineRun doesn't exist (idempotent).
Issue #383: Before deleting, verify the PipelineRun's kubernaut.ai/workflow-execution label matches this WFE's name to avoid destroying a newer WFE's execution resource that shares the deterministic name.
func (*TektonExecutor) Create ¶
func (t *TektonExecutor) Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions) (string, error)
Create builds and creates a Tekton PipelineRun in the execution namespace. Returns the name of the created PipelineRun.
DD-WE-002: PipelineRuns created in dedicated execution namespace DD-WE-003: Deterministic name for atomic locking DD-WE-006: opts.Dependencies are added as workspace bindings.
func (*TektonExecutor) GetStatus ¶
func (t *TektonExecutor) GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) (*ExecutionResult, error)
GetStatus retrieves the current status of the PipelineRun and maps it to ExecutionResult. Returns nil result with nil error if the PipelineRun is not found.