executor

package
v1.2.0-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package executor defines the Strategy pattern interface for workflow execution backends. Each execution engine (Tekton, K8s Job) implements this interface to handle the lifecycle of its execution resource (PipelineRun, Job).

Authority: BR-WE-014 (Kubernetes Job Execution Backend) Design: Strategy pattern dispatch based on spec.executionEngine

Index

Constants

View Source
const (
	SecretMountBasePath    = "/run/kubernaut/secrets"
	ConfigMapMountBasePath = "/run/kubernaut/configmaps"
)

Variables

This section is empty.

Functions

func BuildExtraVars

func BuildExtraVars(params map[string]string) map[string]interface{}

BuildExtraVars converts workflow parameters (map[string]string) to typed JSON values for AWX extra_vars. Attempts type coercion: integers, booleans, floats, JSON arrays/objects are converted to their native types. Plain strings remain strings.

func ExecutionResourceName

func ExecutionResourceName(targetResource string) string

ExecutionResourceName generates a deterministic name from targetResource. DD-WE-003: Lock Persistence via Deterministic Name Format: wfe-<sha256(targetResource)[:16]>

Types

type AWXClient

type AWXClient interface {
	LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)
	GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)
	CancelJob(ctx context.Context, jobID int) error
	FindJobTemplateByName(ctx context.Context, name string) (int, error)

	// Credential lifecycle for dependencies.secrets injection (BR-WE-015).
	// The executor dynamically creates AWX credential types per unique K8s Secret name,
	// then creates ephemeral credentials per WFE execution and cleans them up after completion.
	CreateCredentialType(ctx context.Context, name string, inputs CredentialTypeInputs, injectors CredentialTypeInjectors) (int, error)
	FindCredentialTypeByName(ctx context.Context, name string) (int, error)
	FindCredentialTypeByKind(ctx context.Context, kind string, managed bool) (int, error)
	CreateCredential(ctx context.Context, name string, credTypeID, orgID int, inputs map[string]string) (int, error)
	DeleteCredential(ctx context.Context, credentialID int) error
	LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, credentialIDs []int) (int, error)
	GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)
}

AWXClient defines the interface for AWX/AAP REST API operations. Mocked in unit tests; real implementation provided by AWXHTTPClient.

type AWXHTTPClient

type AWXHTTPClient struct {
	// contains filtered or unexported fields
}

AWXHTTPClient implements AWXClient using the AWX REST API.

func NewAWXHTTPClient

func NewAWXHTTPClient(baseURL, token string, insecure bool) *AWXHTTPClient

NewAWXHTTPClient creates a new AWX REST API client.

func (*AWXHTTPClient) CancelJob

func (c *AWXHTTPClient) CancelJob(ctx context.Context, jobID int) error

func (*AWXHTTPClient) CreateCredential

func (c *AWXHTTPClient) CreateCredential(ctx context.Context, name string, credTypeID, orgID int, inputs map[string]string) (int, error)

func (*AWXHTTPClient) CreateCredentialType

func (c *AWXHTTPClient) CreateCredentialType(ctx context.Context, name string, inputs CredentialTypeInputs, injectors CredentialTypeInjectors) (int, error)

func (*AWXHTTPClient) DeleteCredential

func (c *AWXHTTPClient) DeleteCredential(ctx context.Context, credentialID int) error

func (*AWXHTTPClient) FindCredentialTypeByKind added in v1.2.0

func (c *AWXHTTPClient) FindCredentialTypeByKind(ctx context.Context, kind string, managed bool) (int, error)

func (*AWXHTTPClient) FindCredentialTypeByName

func (c *AWXHTTPClient) FindCredentialTypeByName(ctx context.Context, name string) (int, error)

func (*AWXHTTPClient) FindJobTemplateByName

func (c *AWXHTTPClient) FindJobTemplateByName(ctx context.Context, name string) (int, error)

func (*AWXHTTPClient) GetJobStatus

func (c *AWXHTTPClient) GetJobStatus(ctx context.Context, jobID int) (*AWXJobStatus, error)

func (*AWXHTTPClient) GetJobTemplateCredentials added in v1.1.0

func (c *AWXHTTPClient) GetJobTemplateCredentials(ctx context.Context, templateID int) ([]int, error)

func (*AWXHTTPClient) LaunchJobTemplate

func (c *AWXHTTPClient) LaunchJobTemplate(ctx context.Context, templateID int, extraVars map[string]interface{}) (int, error)

func (*AWXHTTPClient) LaunchJobTemplateWithCreds

func (c *AWXHTTPClient) LaunchJobTemplateWithCreds(ctx context.Context, templateID int, extraVars map[string]interface{}, credentialIDs []int) (int, error)

type AWXJobStatus

type AWXJobStatus struct {
	ID           int    `json:"id"`
	Status       string `json:"status"`
	Failed       bool   `json:"failed"`
	ResultStdout string `json:"result_stdout,omitempty"`
}

AWXJobStatus represents the status response from AWX GET /api/v2/jobs/{id}/

type AnsibleExecutor

type AnsibleExecutor struct {
	AWXClient      AWXClient
	K8sClient      client.Client
	Clientset      kubernetes.Interface
	OrganizationID int
	Logger         logr.Logger

	// InClusterCredentialsFn reads K8s API credentials from the controller's
	// service account mount. Replaceable for unit testing.
	InClusterCredentialsFn func() (*InClusterCredentials, error)
}

AnsibleExecutor implements the Executor interface for AWX/AAP workflow execution. BR-WE-015: Launches AWX Job Templates and tracks execution status via the AWX REST API.

func NewAnsibleExecutor

func NewAnsibleExecutor(awxClient AWXClient, k8sClient client.Client, clientset kubernetes.Interface, orgID int, logger logr.Logger) *AnsibleExecutor

NewAnsibleExecutor creates a new AnsibleExecutor with the given AWX client. k8sClient is used to read K8s Secrets (dependencies.secrets) and update WFE status. clientset is used for TokenRequest API calls (Issue #501). orgID is the AWX organization ID for ephemeral credential creation.

func (*AnsibleExecutor) Cleanup

Cleanup deletes ephemeral AWX credentials (if any) and cancels the AWX job.

func (*AnsibleExecutor) Create

Create launches an AWX Job Template from the WFE spec. DD-WE-006: When opts.Dependencies contains secrets, the executor reads them from Kubernetes, creates ephemeral AWX credentials, and attaches them to the job launch.

func (*AnsibleExecutor) Engine

func (a *AnsibleExecutor) Engine() string

Engine returns "ansible".

func (*AnsibleExecutor) GetStatus

GetStatus polls AWX for the job status and maps it to an ExecutionResult.

type CreateOptions

type CreateOptions struct {
	Dependencies *models.WorkflowDependencies
}

CreateOptions carries optional configuration for execution resource creation. Using a struct allows adding new fields without breaking the interface. DD-WE-006: Dependencies are passed here, queried from DS by the reconciler.

type CreateResult added in v1.2.0

type CreateResult struct {
	ResourceName string
	Warnings     []Warning
}

CreateResult is the return value of Executor.Create. Issue #501: Extended from a bare resource name string to carry optional warnings (e.g. token TTL shortened by the API server).

type CredentialTypeField added in v1.2.0

type CredentialTypeField struct {
	ID        string `json:"id"`
	Label     string `json:"label"`
	Type      string `json:"type"`
	Secret    bool   `json:"secret,omitempty"`
	Multiline bool   `json:"multiline,omitempty"`
}

CredentialTypeField represents a single input field definition for an AWX credential type.

type CredentialTypeInjectors added in v1.2.0

type CredentialTypeInjectors struct {
	File map[string]string `json:"file,omitempty"`
	Env  map[string]string `json:"env,omitempty"`
}

CredentialTypeInjectors defines how AWX injects credential values into jobs. File templates are rendered by AWX's Jinja2 engine and written to temp files. Env vars reference either Jinja2 placeholders or tower.filename.* paths.

type CredentialTypeInputs added in v1.2.0

type CredentialTypeInputs struct {
	Fields []CredentialTypeField `json:"fields"`
}

CredentialTypeInputs defines the input schema for an AWX credential type.

type ExecutionResult

type ExecutionResult struct {
	// Phase maps to WFE phase constants (Pending, Running, Completed, Failed)
	Phase string

	// Reason is a machine-readable reason for the current phase
	Reason string

	// Message is a human-readable description of the current state
	Message string

	// Summary contains the lightweight execution status for WFE status field
	Summary *workflowexecutionv1alpha1.ExecutionStatusSummary
}

ExecutionResult represents the mapped status of an execution resource. Both Tekton PipelineRun conditions and K8s Job conditions are mapped to this common structure.

func MapAWXStatusToResult

func MapAWXStatusToResult(status *AWXJobStatus) *ExecutionResult

MapAWXStatusToResult maps an AWX job status to an ExecutionResult. AWX states: pending, waiting, running, successful, failed, error, canceled

type Executor

type Executor interface {
	// Create builds and creates the execution resource (PipelineRun, Job, or
	// AWX job) in the specified execution namespace. Returns a CreateResult
	// containing the resource name and any non-fatal warnings.
	//
	// The execution resource name MUST be deterministic based on targetResource
	// to provide atomic resource locking (DD-WE-003).
	//
	// DD-WE-006: opts.Dependencies carries schema-declared infrastructure dependencies
	// to be mounted as volumes (Job) or workspace bindings (Tekton).
	Create(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string, opts CreateOptions) (*CreateResult, error)

	// GetStatus retrieves the current status of the execution resource.
	// Returns an ExecutionResult that maps the backend-specific status to WFE phases.
	//
	// Returns nil result with nil error if the execution resource is not found
	// (may have been externally deleted - BR-WE-007).
	GetStatus(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) (*ExecutionResult, error)

	// Cleanup deletes the execution resource during WFE deletion (finalizer).
	// Returns nil if the resource doesn't exist (idempotent).
	Cleanup(ctx context.Context, wfe *workflowexecutionv1alpha1.WorkflowExecution, namespace string) error

	// Engine returns the execution engine identifier ("tekton", "job", or "ansible")
	Engine() string
}

Executor defines the interface for workflow execution backends. Tekton, Job, and Ansible executors implement this interface.

Lifecycle:

  1. Create - Creates the execution resource in the execution namespace
  2. GetStatus - Polls the execution resource for current status
  3. Cleanup - Deletes the execution resource during WFE deletion

type InClusterCredentials added in v1.2.0

type InClusterCredentials struct {
	Host   string
	Token  string
	CACert string
}

InClusterCredentials holds Kubernetes API credentials read from the controller's in-cluster service account mount.

func ReadInClusterCredentials added in v1.2.0

func ReadInClusterCredentials() (*InClusterCredentials, error)

ReadInClusterCredentials reads the controller's own K8s API credentials from the standard in-cluster mount paths. The token is read fresh on each call because projected tokens are rotated by the kubelet.

type JobExecutor

type JobExecutor struct {
	Client client.Client
}

JobExecutor implements the Executor interface for Kubernetes Jobs. DD-WE-005 v2.0: SA is read from WFE spec at execution time, not from executor config.

Authority: BR-WE-014 (Kubernetes Job Execution Backend)

func NewJobExecutor

func NewJobExecutor(c client.Client) *JobExecutor

NewJobExecutor creates a new JobExecutor.

func (*JobExecutor) Cleanup

Cleanup deletes the Job in the execution namespace. Returns nil if the Job doesn't exist (idempotent).

Issue #383: Before deleting, verify the Job's kubernaut.ai/workflow-execution label matches this WFE's name. Because the Job name is deterministic (derived from TargetResource), a newer WFE for the same target may have already replaced the Job. Deleting without this check would destroy the new WFE's Job.

func (*JobExecutor) Create

Create builds and creates a Kubernetes Job in the execution namespace. Returns the name of the created Job.

The Job runs the container image from the workflow catalog with parameters injected as environment variables. DD-WE-006: opts.Dependencies are mounted as volumes at /run/kubernaut/secrets/<name> and /run/kubernaut/configmaps/<name>.

func (*JobExecutor) Engine

func (j *JobExecutor) Engine() string

Engine returns "job".

func (*JobExecutor) GetStatus

GetStatus retrieves the current status of the Job and maps it to ExecutionResult. Returns nil result with nil error if the Job is not found.

Job condition mapping:

  • conditions[Complete]=True → PhaseCompleted
  • conditions[Failed]=True → PhaseFailed
  • no terminal condition → PhaseRunning

func (*JobExecutor) IsCompleted added in v1.1.0

func (j *JobExecutor) IsCompleted(ctx context.Context, targetResource string, namespace string) (bool, error)

IsCompleted checks whether the existing Job for the given target resource is in a terminal state (Succeeded or Failed). Used by the controller to determine if a stale completed Job can be cleaned up before retrying creation (Issue #374).

Returns (true, nil) if the Job has a terminal condition (JobComplete or JobFailed). Returns (false, nil) if the Job is still running (no terminal condition). Returns (false, err) if the Job cannot be fetched (e.g., NotFound race).

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps execution engine names to Executor implementations. Used by the controller to dispatch to the correct executor.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new executor registry.

func (*Registry) Engines

func (r *Registry) Engines() []string

Engines returns the list of registered engine names.

func (*Registry) Get

func (r *Registry) Get(engine string) (Executor, error)

Get returns the executor for the given engine name. Returns an error if no executor is registered for the engine.

func (*Registry) Register

func (r *Registry) Register(engine string, executor Executor)

Register adds an executor for the given engine name.

type TektonExecutor

type TektonExecutor struct {
	Client client.Client
}

TektonExecutor implements the Executor interface for Tekton PipelineRuns. DD-WE-005 v2.0: SA is read from WFE spec at execution time, not from executor config.

func NewTektonExecutor

func NewTektonExecutor(c client.Client) *TektonExecutor

NewTektonExecutor creates a new TektonExecutor.

func (*TektonExecutor) Cleanup

Cleanup deletes the PipelineRun in the execution namespace. Returns nil if the PipelineRun doesn't exist (idempotent).

Issue #383: Before deleting, verify the PipelineRun's kubernaut.ai/workflow-execution label matches this WFE's name to avoid destroying a newer WFE's execution resource that shares the deterministic name.

func (*TektonExecutor) Create

Create builds and creates a Tekton PipelineRun in the execution namespace. Returns the name of the created PipelineRun.

DD-WE-002: PipelineRuns created in dedicated execution namespace DD-WE-003: Deterministic name for atomic locking DD-WE-006: opts.Dependencies are added as workspace bindings.

func (*TektonExecutor) Engine

func (t *TektonExecutor) Engine() string

Engine returns "tekton".

func (*TektonExecutor) GetStatus

GetStatus retrieves the current status of the PipelineRun and maps it to ExecutionResult. Returns nil result with nil error if the PipelineRun is not found.

type Warning added in v1.2.0

type Warning struct {
	Type    string // Condition type, e.g. "TokenTTLInsufficient"
	Reason  string // Machine-readable reason, e.g. "TokenTTLShortened"
	Message string // Human-readable detail
}

Warning represents a non-fatal issue detected during execution resource creation. The reconciler translates warnings into WFE conditions and K8s events so operators can observe them via kubectl.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL