executor

package
v1.1.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package executor defines the Strategy pattern interface for workflow execution backends. Each execution engine (Tekton, K8s Job) implements this interface to handle the lifecycle of its execution resource (PipelineRun, Job).

Authority: BR-WE-014 (Kubernetes Job Execution Backend) Design: Strategy pattern dispatch based on spec.executionEngine

Index

Constants

View Source
const (
	SecretMountBasePath    = "/run/kubernaut/secrets"
	ConfigMapMountBasePath = "/run/kubernaut/configmaps"
)
View Source
const (
	// AnnotationEphemeralCredentials stores comma-separated AWX credential IDs
	// created by injectDependencySecrets for cleanup after execution.
	AnnotationEphemeralCredentials = "kubernaut.ai/awx-ephemeral-credentials"
)
View Source
const DefaultServiceAccountName = "kubernaut-workflow-runner"

DefaultServiceAccountName is the default SA for PipelineRuns

Variables

This section is empty.

Functions

func BuildExtraVars

func BuildExtraVars(params map[string]string) map[string]interface{}

BuildExtraVars converts workflow parameters (map[string]string) to typed JSON values for AWX extra_vars. Attempts type coercion: integers, booleans, floats, JSON arrays/objects are converted to their native types. Plain strings remain strings.

func ExecutionResourceName

func ExecutionResourceName(targetResource string) string

ExecutionResourceName generates a deterministic name from targetResource. DD-WE-003: Lock Persistence via Deterministic Name Format: wfe-<sha256(targetResource)[:16]>

Types

type AWXClient

type AWXClient interface {
	LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)
	GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)
	CancelJob(ctx context.Context, jobID int) error
	FindJobTemplateByName(ctx context.Context, name string) (int, error)

	// Credential lifecycle for dependencies.secrets injection (BR-WE-015).
	// The executor dynamically creates AWX credential types per unique K8s Secret name,
	// then creates ephemeral credentials per WFE execution and cleans them up after completion.
	CreateCredentialType(ctx context.Context, name string, inputs, injectors map[string]interface{}) (int, error)
	FindCredentialTypeByName(ctx context.Context, name string) (int, error)
	CreateCredential(ctx context.Context, name string, credTypeID, orgID int, inputs map[string]string) (int, error)
	DeleteCredential(ctx context.Context, credentialID int) error
	LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, credentialIDs []int) (int, error)
	GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)
}

AWXClient defines the interface for AWX/AAP REST API operations. Mocked in unit tests; real implementation provided by AWXHTTPClient.

type AWXHTTPClient

type AWXHTTPClient struct {
	// contains filtered or unexported fields
}

AWXHTTPClient implements AWXClient using the AWX REST API.

func NewAWXHTTPClient

func NewAWXHTTPClient(baseURL, token string, insecure bool) *AWXHTTPClient

NewAWXHTTPClient creates a new AWX REST API client.

func (*AWXHTTPClient) CancelJob

func (c *AWXHTTPClient) CancelJob(ctx context.Context, jobID int) error

func (*AWXHTTPClient) CreateCredential

func (c *AWXHTTPClient) CreateCredential(ctx context.Context, name string, credTypeID, orgID int, inputs map[string]string) (int, error)

func (*AWXHTTPClient) CreateCredentialType

func (c *AWXHTTPClient) CreateCredentialType(ctx context.Context, name string, inputs, injectors map[string]interface{}) (int, error)

func (*AWXHTTPClient) DeleteCredential

func (c *AWXHTTPClient) DeleteCredential(ctx context.Context, credentialID int) error

func (*AWXHTTPClient) FindCredentialTypeByName

func (c *AWXHTTPClient) FindCredentialTypeByName(ctx context.Context, name string) (int, error)

func (*AWXHTTPClient) FindJobTemplateByName

func (c *AWXHTTPClient) FindJobTemplateByName(ctx context.Context, name string) (int, error)

func (*AWXHTTPClient) GetJobStatus

func (c *AWXHTTPClient) GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)

func (*AWXHTTPClient) GetJobTemplateCredentials added in v1.1.0

func (c *AWXHTTPClient) GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)

func (*AWXHTTPClient) LaunchJobTemplate

func (c *AWXHTTPClient) LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)

func (*AWXHTTPClient) LaunchJobTemplateWithCreds

func (c *AWXHTTPClient) LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, credentialIDs []int) (int, error)

type AWXJobStatus

type AWXJobStatus struct {
	ID           int    `json:"id"`
	Status       string `json:"status"`
	Failed       bool   `json:"failed"`
	ResultStdout string `json:"result_stdout,omitempty"`
}

AWXJobStatus represents the status response from AWX GET /api/v2/jobs/{id}/

type AnsibleExecutor

type AnsibleExecutor struct {
	AWXClient      AWXClient
	K8sClient      client.Client
	OrganizationID int
	Logger         logr.Logger
}

AnsibleExecutor implements the Executor interface for AWX/AAP workflow execution. BR-WE-015: Launches AWX Job Templates and tracks execution status via the AWX REST API.

func NewAnsibleExecutor

func NewAnsibleExecutor(awxClient AWXClient, k8sClient client.Client, orgID int, logger logr.Logger) *AnsibleExecutor

NewAnsibleExecutor creates a new AnsibleExecutor with the given AWX client. k8sClient is used to read K8s Secrets (dependencies.secrets) and write WFE annotations. orgID is the AWX organization ID for ephemeral credential creation.

func (*AnsibleExecutor) Cleanup

Cleanup deletes ephemeral AWX credentials (if any) and cancels the AWX job.

func (*AnsibleExecutor) Create

Create launches an AWX Job Template from the WFE spec. DD-WE-006: When opts.Dependencies contains secrets, the executor reads them from Kubernetes, creates ephemeral AWX credentials, and attaches them to the job launch.

func (*AnsibleExecutor) Engine

func (a *AnsibleExecutor) Engine() string

Engine returns "ansible".

func (*AnsibleExecutor) GetStatus

GetStatus polls AWX for the job status and maps it to an ExecutionResult.

type CreateOptions

type CreateOptions struct {
	Dependencies *models.WorkflowDependencies
}

CreateOptions carries optional configuration for execution resource creation. Using a struct allows adding new fields without breaking the interface. DD-WE-006: Dependencies are passed here, queried from DS by the reconciler.

type ExecutionResult

type ExecutionResult struct {
	// Phase maps to WFE phase constants (Pending, Running, Completed, Failed)
	Phase string

	// Reason is a machine-readable reason for the current phase
	Reason string

	// Message is a human-readable description of the current state
	Message string

	// Summary contains the lightweight execution status for WFE status field
	Summary *workflowexecutionv1alpha1.ExecutionStatusSummary
}

ExecutionResult represents the mapped status of an execution resource. Both Tekton PipelineRun conditions and K8s Job conditions are mapped to this common structure.

func MapAWXStatusToResult

func MapAWXStatusToResult(status *AWXJobStatus) *ExecutionResult

MapAWXStatusToResult maps an AWX job status to an ExecutionResult. AWX states: pending, waiting, running, successful, failed, error, canceled

type Executor

type Executor interface {
	// Create builds and creates the execution resource (PipelineRun or Job)
	// in the specified execution namespace. Returns the name of the created resource.
	//
	// The execution resource name MUST be deterministic based on targetResource
	// to provide atomic resource locking (DD-WE-003).
	//
	// DD-WE-006: opts.Dependencies carries schema-declared infrastructure dependencies
	// to be mounted as volumes (Job) or workspace bindings (Tekton).
	Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions) (string, error)

	// GetStatus retrieves the current status of the execution resource.
	// Returns an ExecutionResult that maps the backend-specific status to WFE phases.
	//
	// Returns nil result with nil error if the execution resource is not found
	// (may have been externally deleted - BR-WE-007).
	GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) (*ExecutionResult, error)

	// Cleanup deletes the execution resource during WFE deletion (finalizer).
	// Returns nil if the resource doesn't exist (idempotent).
	Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) error

	// Engine returns the execution engine identifier ("tekton" or "job")
	Engine() string
}

Executor defines the interface for workflow execution backends. Both TektonExecutor and JobExecutor implement this interface.

Lifecycle:

  1. Create - Creates the execution resource (PipelineRun or Job) in the execution namespace
  2. GetStatus - Polls the execution resource for current status
  3. Cleanup - Deletes the execution resource during WFE deletion

type JobExecutor

type JobExecutor struct {
	Client             client.Client
	ServiceAccountName string
}

JobExecutor implements the Executor interface for Kubernetes Jobs. Used for single-step remediations that don't require Tekton pipeline machinery.

Authority: BR-WE-014 (Kubernetes Job Execution Backend)

func NewJobExecutor

func NewJobExecutor(c client.Client, serviceAccountName string) *JobExecutor

NewJobExecutor creates a new JobExecutor.

func (*JobExecutor) Cleanup

Cleanup deletes the Job in the execution namespace. Returns nil if the Job doesn't exist (idempotent).

Issue #383: Before deleting, verify the Job's kubernaut.ai/workflow-execution label matches this WFE's name. Because the Job name is deterministic (derived from TargetResource), a newer WFE for the same target may have already replaced the Job. Deleting without this check would destroy the new WFE's Job.

func (*JobExecutor) Create

Create builds and creates a Kubernetes Job in the execution namespace. Returns the name of the created Job.

The Job runs the container image from the workflow catalog with parameters injected as environment variables. DD-WE-006: opts.Dependencies are mounted as volumes at /run/kubernaut/secrets/<name> and /run/kubernaut/configmaps/<name>.

func (*JobExecutor) Engine

func (j *JobExecutor) Engine() string

Engine returns "job".

func (*JobExecutor) GetStatus

GetStatus retrieves the current status of the Job and maps it to ExecutionResult. Returns nil result with nil error if the Job is not found.

Job condition mapping:

  • conditions[Complete]=True → PhaseCompleted
  • conditions[Failed]=True → PhaseFailed
  • no terminal condition → PhaseRunning

func (*JobExecutor) IsCompleted added in v1.1.0

func (j *JobExecutor) IsCompleted(ctx context.Context, targetResource string, namespace string) (bool, error)

IsCompleted checks whether the existing Job for the given target resource is in a terminal state (Succeeded or Failed). Used by the controller to determine if a stale completed Job can be cleaned up before retrying creation (Issue #374).

Returns (true, nil) if the Job has a terminal condition (JobComplete or JobFailed). Returns (false, nil) if the Job is still running (no terminal condition). Returns (false, err) if the Job cannot be fetched (e.g., NotFound race).

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps execution engine names to Executor implementations. Used by the controller to dispatch to the correct executor.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new executor registry.

func (*Registry) Engines

func (r *Registry) Engines() []string

Engines returns the list of registered engine names.

func (*Registry) Get

func (r *Registry) Get(engine string) (Executor, error)

Get returns the executor for the given engine name. Returns an error if no executor is registered for the engine.

func (*Registry) Register

func (r *Registry) Register(engine string, executor Executor)

Register adds an executor for the given engine name.

type TektonExecutor

type TektonExecutor struct {
	Client             client.Client
	ServiceAccountName string
}

TektonExecutor implements the Executor interface for Tekton PipelineRuns. Extracted from WorkflowExecutionReconciler (BR-WE-014).

func NewTektonExecutor

func NewTektonExecutor(c client.Client, serviceAccountName string) *TektonExecutor

NewTektonExecutor creates a new TektonExecutor.

func (*TektonExecutor) Cleanup

Cleanup deletes the PipelineRun in the execution namespace. Returns nil if the PipelineRun doesn't exist (idempotent).

Issue #383: Before deleting, verify the PipelineRun's kubernaut.ai/workflow-execution label matches this WFE's name to avoid destroying a newer WFE's execution resource that shares the deterministic name.

func (*TektonExecutor) Create

Create builds and creates a Tekton PipelineRun in the execution namespace. Returns the name of the created PipelineRun.

DD-WE-002: PipelineRuns created in dedicated execution namespace DD-WE-003: Deterministic name for atomic locking DD-WE-006: opts.Dependencies are added as workspace bindings.

func (*TektonExecutor) Engine

func (t *TektonExecutor) Engine() string

Engine returns "tekton".

func (*TektonExecutor) GetStatus

GetStatus retrieves the current status of the PipelineRun and maps it to ExecutionResult. Returns nil result with nil error if the PipelineRun is not found.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL