README
¶
Temporal Workflow Guide
This guide explains how to add a new operation to the Temporal workflow executor in the RLA system.
Table of Contents
Overview
The RLA Temporal workflow system provides durable, retryable execution of long-running operations across distributed rack components. It has three layers:
- Manager: Receives generic
ExecutionRequests, looks up the right workflow from the registry, and submits it to Temporal - Workflows: Orchestrate activities in sequence or parallel; each workflow self-registers with its
TaskType - Activities: Execute actual work (API calls, status checks) against component managers
Architecture
ExecutionRequest{OperationType, OperationInfo}
│
▼
┌─────────────────┐
│ Manager │ Looks up WorkflowDescriptor from registry
│ manager.go │ Calls client.ExecuteWorkflow(desc.WorkflowName, ...)
└────────┬────────┘
│ ExecuteWorkflow
▼
┌─────────────────────────────────────┐
│ Workflow │ Defined per operation type
│ workflow/powercontrol.go, etc. │ Calls executeRuleBasedOperation()
└────────┬──────────────┬────────────┘
│ child wf │ ExecuteActivity (name constant)
▼ ▼
┌────────────────┐ ┌──────────────────┐
│ GenericComponent│ │ Activities │ Registered with explicit names
│ StepWorkflow │ │ activity/*.go │ via RegisterActivityWithOptions
└────────────────┘ └──────────────────┘
Registry Pattern
Workflow registry (workflow/registry.go): uses init() self-registration. Task-dispatched workflow files call registerTaskWorkflow[T, *T](taskType, name, fn), which derives the timeout and builds the Unmarshal closure automatically. Internal workflows (those without a TaskType) call register(WorkflowDescriptor{...}) directly. Nothing needs to be added to a central list — the registry is populated automatically at startup.
Activity registry (activity/registry.go): uses per-instance dependency injection. Build() creates an *Activities value via activity.New(updater, registry) and calls acts.All() to obtain the name → bound-method map, then registers each entry with the Temporal worker via RegisterActivityWithOptions(fn, {Name: name}). Because activities are methods on *Activities, each manager instance holds its own isolated copy of the dependencies — no shared mutable globals.
Adding a New Operation
Step 0: Define the Task Type and Operation Metadata
Before any activity or workflow code can compile, two prerequisites must exist.
1. Register the task type in internal/task/common/common.go:
const (
// ... existing constants ...
TaskTypeHealthCheck TaskType = "health_check"
)
func TaskTypeFromString(s string) TaskType {
switch s {
// ... existing cases ...
case TaskTypeHealthCheck.String():
return TaskTypeHealthCheck
// ...
}
}
2. Add operation options (at minimum a timeout) in internal/task/operations/options.go or equivalent:
func GetOperationOptions(tt taskcommon.TaskType) OperationOptions {
switch tt {
// ... existing cases ...
case taskcommon.TaskTypeHealthCheck:
return OperationOptions{Timeout: 10 * time.Minute}
// ...
}
}
3. Define the task-info struct in the operations package. Include a Validate() method — it is called by the Unmarshal closure that registerTaskWorkflow builds automatically:
// HealthCheckTaskInfo carries the parameters for a health check operation.
type HealthCheckTaskInfo struct {
// CheckType selects which checks to run (e.g. "full", "connectivity").
CheckType string `json:"check_type"`
}
func (i *HealthCheckTaskInfo) Validate() error {
if i.CheckType == "" {
return fmt.Errorf("check_type is required")
}
return nil
}
Step 1: Define Activity Methods
Add methods to *Activities in activity/activity.go. Each method performs one unit of work and must be idempotent (Temporal may retry it).
// HealthCheck checks the health status of a component.
func (a *Activities) HealthCheck(
ctx context.Context,
target common.Target,
) (operations.HealthStatus, error) {
cm, err := a.validAndGetComponentManager(target)
if err != nil {
return operations.HealthStatusUnknown, err
}
return cm.HealthCheck(ctx, target)
}
Key points:
- Receiver is
*Activities; usea.validAndGetComponentManager(not a free function) - First non-receiver parameter is always
context.Context - Activities are retried automatically per the workflow's retry policy
- Validate inputs; return descriptive errors
Step 2: Assign Names and Expose Activities
In activity/activity.go, add a name constant. In activity/registry.go, add the bound method to All(). These are the only two places the name string appears — everywhere else uses the constant.
// activity/activity.go
const (
// ... existing constants ...
NameHealthCheck = "HealthCheck"
)
// activity/registry.go — inside All()
func (a *Activities) All() map[string]any {
return map[string]any{
// ... existing entries ...
NameHealthCheck: a.HealthCheck,
}
}
Build() in manager.go calls acts.All() and registers each entry with the Temporal worker via RegisterActivityWithOptions(fn, {Name: name}) — no manual update to manager.go is needed.
Use NameHealthCheck (not "HealthCheck") in all workflow.ExecuteActivity call sites.
Step 3: Create the Workflow File
Create workflow/healthcheck.go. The file must:
- Call
registerTaskWorkflow[T, PT](...)ininit()to register the workflow - Implement the workflow function (prefer unexported)
package workflow
import (
"fmt"
"go.temporal.io/sdk/workflow"
taskcommon "github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/common"
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/executor/temporalworkflow/activity"
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/operations"
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/task"
)
// init registers the HealthCheck workflow descriptor with the package registry.
func init() {
registerTaskWorkflow[operations.HealthCheckTaskInfo, *operations.HealthCheckTaskInfo](
taskcommon.TaskTypeHealthCheck, "HealthCheck", healthCheck,
)
}
// healthCheck orchestrates health checks across all target components.
func healthCheck(
ctx workflow.Context,
reqInfo task.ExecutionInfo,
info *operations.HealthCheckTaskInfo,
) error {
ctx = workflow.WithActivityOptions(ctx, healthCheckActivityOptions)
if err := updateRunningTaskStatus(ctx, reqInfo.TaskID); err != nil {
return err
}
typeToTargets := buildTargets(&reqInfo)
err := executeRuleBasedOperation(
ctx,
typeToTargets,
activity.NameHealthCheck,
info,
reqInfo.RuleDefinition,
)
return updateFinishedTaskStatus(ctx, reqInfo.TaskID, err)
}
registerTaskWorkflow derives the Timeout from operations.GetOperationOptions and builds the Unmarshal closure via unmarshalAndValidate, so neither needs to be written by hand. manager.Execute() looks up the descriptor by OperationType and submits it to Temporal — no changes to manager.go are needed.
Key points:
registerTaskWorkflowis the standard entry point for task-dispatched workflows; useregister()directly only for internal workflows that have noTaskTypeWorkflowNameis what Temporal uses internally; keep it stable — it need not match the Go function nameWorkflowFunccan be unexported to decouple Go symbol renames from the stable Temporal name- Use
activity.NameXxxconstants, not string literals, inworkflow.ExecuteActivitycalls
Complete Example
A full end-to-end trace for the HealthCheck operation:
1. Operation info type (operations package)
// HealthCheckTaskInfo carries the parameters for a health check operation.
type HealthCheckTaskInfo struct {
// CheckType selects which checks to run (e.g. "full", "connectivity").
CheckType string `json:"check_type"`
}
func (i *HealthCheckTaskInfo) Validate() error {
if i.CheckType == "" {
return fmt.Errorf("check_type is required")
}
return nil
}
2. Activity method, name constant, and registration
// In activity/activity.go — add the name constant and method.
const (
// ... existing constants ...
NameHealthCheck = "HealthCheck"
)
func (a *Activities) HealthCheck(ctx context.Context, target common.Target) (operations.HealthStatus, error) {
cm, err := a.validAndGetComponentManager(target)
if err != nil {
return operations.HealthStatusUnknown, err
}
return cm.HealthCheck(ctx, target)
}
// In activity/registry.go — add the bound method to All().
func (a *Activities) All() map[string]any {
return map[string]any{
// ... existing entries ...
NameHealthCheck: a.HealthCheck,
}
}
3. Workflow file (workflow/healthcheck.go)
func init() {
registerTaskWorkflow[operations.HealthCheckTaskInfo, *operations.HealthCheckTaskInfo](
taskcommon.TaskTypeHealthCheck, "HealthCheck", healthCheck,
)
}
func healthCheck(ctx workflow.Context, reqInfo task.ExecutionInfo, info *operations.HealthCheckTaskInfo) error {
ctx = workflow.WithActivityOptions(ctx, healthCheckActivityOptions)
// ... orchestration logic ...
if err := updateRunningTaskStatus(ctx, reqInfo.TaskID); err != nil {
return err
}
err := executeRuleBasedOperation(
ctx,
buildTargets(&reqInfo),
activity.NameHealthCheck,
info,
reqInfo.RuleDefinition,
)
return updateFinishedTaskStatus(ctx, reqInfo.TaskID, err)
}
4. Dispatching from the caller
The caller constructs an ExecutionRequest and calls executor.Execute(). No operation-specific code is needed in the manager or executor layers.
req := taskdef.ExecutionRequest{
Info: taskdef.ExecutionInfo{
TaskID: task.ID,
Components: components,
RuleDefinition: ruleDef,
OperationType: taskcommon.TaskTypeHealthCheck,
OperationInfo: task.Operation.Info, // json.RawMessage
},
Async: true,
}
resp, err := executor.Execute(ctx, &req)
Best Practices
Activity names
- Define one
NameXxxconstant per activity inactivity/activity.go - Always use the constant in
workflow.ExecuteActivitycalls — never write the string inline - The constant is the single source of truth;
RegisterActivityWithOptionsand all call sites use it
Workflow registration
- Each workflow file owns its own
init()— no central list to maintain - Use
registerTaskWorkflow[T, *T](taskType, name, fn)for task-dispatched workflows; it derivesTimeoutfromGetOperationOptionsand builds theUnmarshal+Validateclosure automatically - Use
register(WorkflowDescriptor{...})directly only for internal workflows that have noTaskType(e.g.genericComponentStepWorkflow) WorkflowNameis written once and never needs to match the Go function name;registerTaskWorkflowpanics at startup ifTaskTypeis zero or invalid, andregisterpanics on any other misconfiguration
Workflow determinism
- Workflows must be deterministic: no random values, no direct I/O, no
time.Now()(useworkflow.Now()) - All non-deterministic work — API calls, status checks, sleeps — must happen inside activities
- Use
workflow.Sleep(), nottime.Sleep()
Rule-based execution
For operations that fan out across component types, use executeRuleBasedOperation(). It drives execution through the RuleDefinition attached to the task:
- Stages run sequentially
- Steps within a stage run in parallel via
genericComponentStepWorkflowchild workflows - Each step can have pre/post actions and a configurable
max_parallelbatch size
Error handling
- Wrap errors with context (which component or stage failed)
- Always call
updateFinishedTaskStatus()— even on the error path — so the task record is updated - Retry policies live in the workflow's
workflow.ActivityOptionsvariable or in the per-stepRetryPolicyfield of the rule definition — not scattered through workflow code
Workflow Patterns
Direct activity call (single component type)
ctx = workflow.WithActivityOptions(ctx, activityOpts)
if err := workflow.ExecuteActivity(ctx, activity.NameHealthCheck, target).Get(ctx, nil); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
Parallel activities with result collection
futures := make([]workflow.Future, len(targets))
for i, target := range targets {
futures[i] = workflow.ExecuteActivity(ctx, activity.NameHealthCheck, target)
}
for i, f := range futures {
if err := f.Get(ctx, nil); err != nil {
return fmt.Errorf("component %s failed: %w", targets[i].ComponentIDs[0], err)
}
}
Polling loop
deadline := workflow.Now(ctx).Add(timeout)
for {
if workflow.Now(ctx).After(deadline) {
return fmt.Errorf("timed out after %v", timeout)
}
if err := workflow.Sleep(ctx, pollInterval); err != nil {
return err
}
var result activity.SomeStatusResult
if err := workflow.ExecuteActivity(ctx, activity.NameGetSomeStatus, target).Get(ctx, &result); err != nil {
continue // transient error, keep polling
}
if result.Done {
return nil
}
}
Rule-based fan-out (recommended for multi-component operations)
err := executeRuleBasedOperation(
ctx,
typeToTargets, // map[ComponentType]Target
activity.NameMyActivity, // legacy fallback name for steps without MainOperation
operationInfo,
reqInfo.RuleDefinition,
)
This drives the entire operation through the RuleDefinition stages and steps, handling parallelism and batching automatically via genericComponentStepWorkflow.
References
- Temporal Documentation
- Temporal Go SDK
- Key files in this package:
activity/activity.go—*Activitiesmethods, name constantsactivity/registry.go—Activitiesstruct,New,All(per-instance dependency injection)workflow/registry.go— workflow registry (WorkflowDescriptor,registerTaskWorkflow,unmarshalAndValidate,register,Get,GetAllWorkflows)workflow/genericcomponentstep.go—genericComponentStepWorkflow,nameGenericComponentStepWorkflowworkflow/helpers.go—executeRuleBasedOperation,buildTargets, batching helpersworkflow/actions.go— pre/post action executors (actionExecutorRegistry)manager/manager.go—Build(worker setup),Execute(workflow dispatch)