argo

package
v2.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 14 Imported by: 0

README

Argo Workflows Client

Go Version

Production-ready Argo Workflows client library with flexible configuration, OpenTelemetry support, and comprehensive error handling.

Features

  • Multiple Connection Modes: Kubernetes API, In-Cluster, or Argo Server HTTP
  • Flexible Configuration: Config structs and functional options
  • OpenTelemetry Integration: Built-in tracing and observability
  • Production-Ready: Proper error handling, no fatal errors
  • Type-Safe: Full Go type safety with generics support
  • Well-Documented: Comprehensive examples and documentation

Installation

go get github.com/jasoet/pkg/v2/argo

Quick Start

Basic Usage (Default Configuration)
package main

import (
    "context"
    "github.com/jasoet/pkg/v2/argo"
)

func main() {
    ctx := context.Background()

    // Create client using default kubeconfig (~/.kube/config)
    ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
    if err != nil {
        panic(err)
    }

    // Use the client
    wfClient := client.NewWorkflowServiceClient()
    // ... interact with workflows
}
Using Functional Options
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/kubeconfig"),
    argo.WithContext("production"),
)
if err != nil {
    return err
}

Connection Modes

1. Kubernetes API Mode (Default)

Uses kubeconfig file to connect to Kubernetes API server.

// Use default kubeconfig location (~/.kube/config)
ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())

// Or specify custom kubeconfig path
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("my-context"),
)
2. In-Cluster Mode

Use when running inside a Kubernetes pod.

ctx, client, err := argo.NewClient(ctx, argo.InClusterConfig())

// Or using functional options
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)
3. Argo Server Mode

Connect via Argo Server HTTP API.

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
)

// For development/testing with HTTP
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("http://argo-server:2746", ""),
    argo.WithArgoServerInsecure(true),
)

Configuration

Config Struct
type Config struct {
    // KubeConfigPath specifies the path to kubeconfig file
    KubeConfigPath string

    // Context specifies the kubeconfig context to use
    Context string

    // InCluster indicates whether to use in-cluster configuration
    InCluster bool

    // ArgoServerOpts configures connection to Argo Server
    ArgoServerOpts ArgoServerOpts

    // OTelConfig enables OpenTelemetry instrumentation
    OTelConfig *otel.Config
}
Pre-configured Factories
// Default configuration (uses ~/.kube/config)
config := argo.DefaultConfig()

// In-cluster configuration
config := argo.InClusterConfig()

// Argo Server configuration
config := argo.ArgoServerConfig("https://argo-server:2746", "Bearer token")

Functional Options

All available functional options:

ctx, client, err := argo.NewClientWithOptions(ctx,
    // Kubernetes API options
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("production"),
    argo.WithInCluster(true),

    // Argo Server options
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
    argo.WithArgoServerInsecure(false),
    argo.WithArgoServerHTTP1(false),

    // Observability
    argo.WithOTelConfig(otelConfig),
)

OpenTelemetry Integration

Enable distributed tracing and monitoring:

import (
    "github.com/jasoet/pkg/v2/argo"
    "github.com/jasoet/pkg/v2/otel"
)

// Create OTel config
otelConfig := otel.NewConfig("my-service").
    WithTracerProvider(tracerProvider).
    WithMeterProvider(meterProvider)

// Create Argo client with OTel
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithOTelConfig(otelConfig),
)

Working with Workflows

List Workflows
import (
    "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

wfClient := client.NewWorkflowServiceClient()

resp, err := wfClient.ListWorkflows(ctx, &workflow.WorkflowListRequest{
    Namespace: "argo",
    ListOptions: &metav1.ListOptions{
        Limit: 10,
    },
})
if err != nil {
    return err
}

for _, wf := range resp.Items {
    fmt.Printf("Workflow: %s, Status: %s\n", wf.Name, wf.Status.Phase)
}
Create a Workflow
import (
    "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

wf := &v1alpha1.Workflow{
    ObjectMeta: metav1.ObjectMeta{
        GenerateName: "hello-world-",
        Namespace:    "argo",
    },
    Spec: v1alpha1.WorkflowSpec{
        Entrypoint: "hello",
        Templates: []v1alpha1.Template{
            {
                Name: "hello",
                Container: &corev1.Container{
                    Image:   "alpine:latest",
                    Command: []string{"echo"},
                    Args:    []string{"Hello, Argo!"},
                },
            },
        },
    },
}

wfClient := client.NewWorkflowServiceClient()
created, err := wfClient.CreateWorkflow(ctx, &workflow.WorkflowCreateRequest{
    Namespace: "argo",
    Workflow:  wf,
})
if err != nil {
    return err
}

fmt.Printf("Created workflow: %s\n", created.Name)
Watch Workflows
import (
    "io"
)

wfClient := client.NewWorkflowServiceClient()

stream, err := wfClient.WatchWorkflows(ctx, &workflow.WatchWorkflowsRequest{
    Namespace: "argo",
    ListOptions: &metav1.ListOptions{
        Watch: true,
    },
})
if err != nil {
    return err
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    fmt.Printf("Workflow %s: %s\n", event.Object.Name, event.Object.Status.Phase)
}

Workflow Builder API

The workflow builder API provides a high-level, fluent interface for constructing Argo Workflows without needing to understand the low-level protobuf-generated structs. It includes template sources, pre-built patterns, and full OpenTelemetry instrumentation.

Quick Start with Builder
import (
    "github.com/jasoet/pkg/v2/argo/builder"
    "github.com/jasoet/pkg/v2/argo/builder/template"
)

// Create workflow steps
deploy := template.NewContainer("deploy", "myapp:v1",
    template.WithCommand("deploy.sh"),
    template.WithEnv("ENV", "production"))

healthCheck := template.NewHTTP("health-check",
    template.WithHTTPURL("https://myapp.com/health"),
    template.WithHTTPMethod("GET"))

// Build workflow
wf, err := builder.NewWorkflowBuilder("deployment", "argo",
    builder.WithServiceAccount("argo-workflow"),
    builder.WithLabels(map[string]string{"app": "myapp"})).
    Add(deploy).
    Add(healthCheck).
    Build()

if err != nil {
    return err
}

// Submit workflow
created, err := argo.SubmitWorkflow(ctx, client, wf, otelConfig)
Template Sources

Template sources are composable workflow components that implement the WorkflowSource interface.

Container Template

Execute commands in containers:

container := template.NewContainer("build", "golang:1.25",
    template.WithCommand("go", "build", "-o", "app"),
    template.WithWorkingDir("/workspace"),
    template.WithEnv("CGO_ENABLED", "0"),
    template.WithCPU("1000m"),
    template.WithMemory("2Gi"))
Script Template

Run inline scripts in various languages:

// Bash script
bashScript := template.NewScript("backup", "bash",
    template.WithScriptContent(`
        echo "Creating backup..."
        tar -czf backup.tar.gz /data
        echo "Backup complete"
    `),
    template.WithScriptWorkingDir("/backup"))

// Python script
pythonScript := template.NewScript("process", "python",
    template.WithScriptContent(`
        import json
        print("Processing data...")
        # Your Python code here
    `))

// Custom image with specific command
customScript := template.NewScript("custom", "bash",
    template.WithScriptImage("myregistry/custom:v1"),
    template.WithScriptCommand("bash", "-x"),
    template.WithScriptContent("echo 'Custom script'"))
HTTP Template

Make HTTP requests for health checks, webhooks, or API calls:

healthCheck := template.NewHTTP("api-check",
    template.WithHTTPURL("https://api.example.com/health"),
    template.WithHTTPMethod("GET"),
    template.WithHTTPSuccessCond("response.statusCode == 200"),
    template.WithHTTPTimeout(30))

webhook := template.NewHTTP("notify",
    template.WithHTTPURL("https://hooks.slack.com/services/..."),
    template.WithHTTPMethod("POST"),
    template.WithHTTPHeader("Content-Type", "application/json"),
    template.WithHTTPBody(`{"text": "Deployment complete"}`))
Workflow Builder Options

Configure workflows with functional options:

wf, err := builder.NewWorkflowBuilder("myworkflow", "argo",
    // Service Account
    builder.WithServiceAccount("argo-workflow"),

    // Labels and Annotations
    builder.WithLabels(map[string]string{
        "app": "myapp",
        "env": "production",
    }),
    builder.WithAnnotations(map[string]string{
        "description": "Production deployment",
    }),

    // Resource Management
    builder.WithArchiveLogs(true),
    builder.WithActiveDeadline(3600), // 1 hour timeout

    // Retry Strategy
    builder.WithRetryStrategy(&v1alpha1.RetryStrategy{
        Limit:       intstr.FromInt(3),
        RetryPolicy: "Always",
    }),

    // Volumes
    builder.WithVolume(corev1.Volume{
        Name: "data",
        VolumeSource: corev1.VolumeSource{
            EmptyDir: &corev1.EmptyDirVolumeSource{},
        },
    }),

    // OpenTelemetry
    builder.WithOTelConfig(otelConfig),
).Build()
Exit Handlers

Add cleanup steps that always run, regardless of workflow success or failure:

// Main workflow steps
deploy := template.NewContainer("deploy", "myapp:v1",
    template.WithCommand("deploy.sh"))

// Cleanup step (always runs)
cleanup := template.NewScript("cleanup", "bash",
    template.WithScriptContent("echo 'Cleaning up resources...'"))

// Notification (always runs)
notify := template.NewScript("notify", "bash",
    template.WithScriptContent(`
        echo "Workflow Status: {{workflow.status}}"
        echo "Duration: {{workflow.duration}}"
    `))

wf, err := builder.NewWorkflowBuilder("deployment", "argo").
    Add(deploy).
    AddExitHandler(cleanup).
    AddExitHandler(notify).
    Build()
Pre-Built Workflow Patterns
CI/CD Patterns
Build-Test-Deploy
import "github.com/jasoet/pkg/v2/argo/patterns"

wf, err := patterns.BuildTestDeploy(
    "myapp", "argo",
    "golang:1.25",      // build image
    "golang:1.25",      // test image
    "deployer:v1",      // deploy image
    builder.WithServiceAccount("argo-workflow"),
)
Build-Test-Deploy with Cleanup
wf, err := patterns.BuildTestDeployWithCleanup(
    "myapp", "argo",
    "golang:1.25",
    "busybox:latest",
    builder.WithArchiveLogs(true),
)
Conditional Deployment

Deploy only if tests pass, with automatic rollback on failure:

wf, err := patterns.ConditionalDeploy(
    "safe-deploy", "argo",
    "golang:1.25",
)
Multi-Environment Deployment

Deploy sequentially to multiple environments:

wf, err := patterns.MultiEnvironmentDeploy(
    "multi-env", "argo",
    "deployer:v1",
    []string{"staging", "production"},
)
Parallel Execution Patterns
Fan-Out/Fan-In

Execute multiple tasks in parallel, then aggregate results:

wf, err := patterns.FanOutFanIn(
    "parallel-tasks", "argo",
    "busybox:latest",
    []string{"task-1", "task-2", "task-3"},
)
Parallel Data Processing

Process multiple data items independently:

wf, err := patterns.ParallelDataProcessing(
    "batch-process", "argo",
    "processor:v1",
    []string{"data-1.csv", "data-2.csv", "data-3.csv"},
    "process.sh",
)
Map-Reduce

Classic map-reduce pattern with parallel mapping and sequential reduction:

wf, err := patterns.MapReduce(
    "word-count", "argo",
    "alpine:latest",
    []string{"file1.txt", "file2.txt", "file3.txt"},
    "wc -w",                          // map command
    "awk '{sum+=$1} END {print sum}'", // reduce command
)
Parallel Test Suites

Run multiple test suites in parallel to speed up CI/CD:

wf, err := patterns.ParallelTestSuite(
    "tests", "argo",
    "golang:1.25",
    map[string]string{
        "unit":        "go test ./internal/...",
        "integration": "go test ./tests/integration/...",
        "e2e":         "go test ./tests/e2e/...",
    },
)
Parallel Deployment

Deploy to multiple regions/environments simultaneously:

wf, err := patterns.ParallelDeployment(
    "multi-region", "argo",
    "deployer:v1",
    []string{"us-west", "us-east", "eu-central"},
)
Enhanced Client Operations

Higher-level operations with full OpenTelemetry instrumentation:

Submit Workflow
import "github.com/jasoet/pkg/v2/argo"

wf, err := builder.NewWorkflowBuilder("deploy", "argo").
    Add(deployStep).
    Build()
if err != nil {
    return err
}

created, err := argo.SubmitWorkflow(ctx, client, wf, otelConfig)
if err != nil {
    return err
}

fmt.Printf("Workflow %s submitted\n", created.Name)
Submit and Wait

Submit a workflow and wait for completion with automatic polling:

completed, err := argo.SubmitAndWait(ctx, client, wf, otelConfig, 10*time.Minute)
if err != nil {
    return err
}

if completed.Status.Phase == v1alpha1.WorkflowSucceeded {
    fmt.Println("Workflow completed successfully")
}
Get Workflow Status
status, err := argo.GetWorkflowStatus(ctx, client, "argo", "my-workflow-abc123", otelConfig)
if err != nil {
    return err
}

fmt.Printf("Phase: %s\n", status.Phase)
fmt.Printf("Progress: %s\n", status.Progress)
List Workflows
// List all workflows
workflows, err := argo.ListWorkflows(ctx, client, "argo", "", otelConfig)

// List with label selector
workflows, err := argo.ListWorkflows(ctx, client, "argo", "app=myapp", otelConfig)
Delete Workflow
err := argo.DeleteWorkflow(ctx, client, "argo", "my-workflow-abc123", otelConfig)
if err != nil {
    return err
}
Advanced: Custom Templates

For advanced use cases, manually construct templates:

import "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

// Create custom template
customTemplate := v1alpha1.Template{
    Name: "custom-step",
    Steps: [][]v1alpha1.WorkflowStep{
        {
            {Name: "step-1", Template: "step-1-template"},
            {Name: "step-2", Template: "step-2-template"},
        },
    },
}

// Add to builder
wf, err := builder.NewWorkflowBuilder("advanced", "argo").
    AddTemplate(customTemplate).
    BuildWithEntrypoint("custom-step")
Complete Example
package main

import (
    "context"
    "time"

    "github.com/jasoet/pkg/v2/argo"
    "github.com/jasoet/pkg/v2/argo/builder"
    "github.com/jasoet/pkg/v2/argo/builder/template"
    "github.com/jasoet/pkg/v2/otel"
)

func main() {
    ctx := context.Background()

    // Create OTel config
    otelConfig := otel.NewConfig("workflow-manager")

    // Create Argo client
    ctx, client, err := argo.NewClientWithOptions(ctx,
        argo.WithOTelConfig(otelConfig))
    if err != nil {
        panic(err)
    }

    // Build workflow
    preCheck := template.NewContainer("pre-check", "alpine:latest",
        template.WithCommand("sh", "-c", "echo 'Pre-flight checks...'"))

    deploy := template.NewContainer("deploy", "myapp:v1",
        template.WithCommand("deploy.sh"),
        template.WithEnv("ENV", "production"),
        template.WithCPU("500m"),
        template.WithMemory("256Mi"))

    healthCheck := template.NewHTTP("health-check",
        template.WithHTTPURL("https://myapp.com/health"),
        template.WithHTTPMethod("GET"),
        template.WithHTTPSuccessCond("response.statusCode == 200"))

    notify := template.NewScript("notify", "bash",
        template.WithScriptContent(`
            echo "Deployment Status: {{workflow.status}}"
        `))

    wf, err := builder.NewWorkflowBuilder("deployment", "argo",
        builder.WithOTelConfig(otelConfig),
        builder.WithServiceAccount("argo-workflow"),
        builder.WithLabels(map[string]string{"app": "myapp"}),
        builder.WithArchiveLogs(true)).
        Add(preCheck).
        Add(deploy).
        Add(healthCheck).
        AddExitHandler(notify).
        Build()

    if err != nil {
        panic(err)
    }

    // Submit and wait
    completed, err := argo.SubmitAndWait(ctx, client, wf, otelConfig, 10*time.Minute)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Workflow completed: %s\n", completed.Status.Phase)
}

Error Handling

The library uses proper error handling without fatal errors:

ctx, client, err := argo.NewClient(ctx, config)
if err != nil {
    // Handle error gracefully
    log.Error().Err(err).Msg("Failed to create Argo client")
    return err
}

// Always check errors from operations
wfClient := client.NewWorkflowServiceClient()
resp, err := wfClient.ListWorkflows(ctx, req)
if err != nil {
    log.Error().Err(err).Msg("Failed to list workflows")
    return err
}

Examples

Example 1: Default Configuration
ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
if err != nil {
    return err
}
Example 2: Custom Kubeconfig and Context
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/etc/kubernetes/admin.conf"),
    argo.WithContext("prod-cluster"),
)
Example 3: In-Cluster Usage
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)
Example 4: Argo Server with Authentication
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server.example.com", "Bearer my-token"),
)
Example 5: With OpenTelemetry
otelConfig := otel.NewConfig("workflow-manager")
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/etc/kubernetes/admin.conf"),
    argo.WithOTelConfig(otelConfig),
)

Running Examples

# Run the comprehensive example
go run -tags=example ./examples/argo

# Or build and run
go build -tags=example -o argo-example ./examples/argo
./argo-example

See examples/argo/README.md for more details.

Comparison with Original Implementation

Before (scp/api)
// util/argo/argo.go - tightly coupled, uses fatal errors
func NewClient(ctx context.Context) (context.Context, apiclient.Client) {
    ctx, argoClient, err := apiclient.NewClientFromOpts(
        apiclient.Opts{
            ArgoServerOpts:       apiclient.ArgoServerOpts{},
            ClientConfigSupplier: kube.GetCmdConfig,
            Context:              ctx,
        })
    if err != nil {
        log.Fatal().Err(err).Msg("unable to create argo client")  // Fatal!
    }
    return ctx, argoClient
}
After (pkg/v2/argo)
// Flexible, reusable, proper error handling
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("production"),
    argo.WithOTelConfig(otelConfig),
)
if err != nil {
    return fmt.Errorf("failed to create client: %w", err)  // Graceful!
}
defer client.Close()

Benefits

Reusable - Can be used across multiple projects ✅ Flexible - Config struct + functional options ✅ Library-friendly - Returns errors instead of fatal ✅ Testable - Easy to mock and test ✅ Observable - OpenTelemetry integration ready ✅ Well-documented - Comprehensive docs and examples ✅ Production-ready - Proper error handling and logging

Best Practices

  1. Use context for lifecycle management

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Clean up context when done
    
    ctx, client, err := argo.NewClient(ctx, config)
    if err != nil {
        return err
    }
    
  2. Use functional options for flexibility

    ctx, client, err := argo.NewClientWithOptions(ctx,
        argo.WithKubeConfig(kubeconfigPath),
        argo.WithContext(contextName),
    )
    
  3. Enable OpenTelemetry in production

    ctx, client, err := argo.NewClientWithOptions(ctx,
        argo.WithOTelConfig(otelConfig),
    )
    
  4. Handle errors gracefully

    if err != nil {
        log.Error().Err(err).Msg("Operation failed")
        return fmt.Errorf("operation failed: %w", err)
    }
    

Testing

# Unit tests
go test ./argo

# Integration tests (requires Kubernetes cluster)
go test -tags=integration ./argo

# All tests with coverage
go test -cover ./argo

Contributing

Contributions are welcome! Please see the main CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.


⬆ Back to Top

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeleteWorkflow added in v2.2.4

func DeleteWorkflow(ctx context.Context, client apiclient.Client, namespace, name string, cfg *otel.Config) error

DeleteWorkflow deletes a workflow by name.

Example:

err := argo.DeleteWorkflow(ctx, client, "argo", "my-workflow-abc123", otelConfig)
if err != nil {
    return err
}

func GetWorkflowStatus added in v2.2.4

func GetWorkflowStatus(ctx context.Context, client apiclient.Client, namespace, name string, cfg *otel.Config) (*v1alpha1.WorkflowStatus, error)

GetWorkflowStatus retrieves the current status of a workflow.

Example:

status, err := argo.GetWorkflowStatus(ctx, client, "argo", "my-workflow-abc123", otelConfig)
if err != nil {
    return err
}
fmt.Printf("Workflow phase: %s\n", status.Phase)

func ListWorkflows added in v2.2.4

func ListWorkflows(ctx context.Context, client apiclient.Client, namespace, labelSelector string, cfg *otel.Config) ([]v1alpha1.Workflow, error)

ListWorkflows lists workflows in a namespace with optional label selector.

Example:

// List all workflows
workflows, err := argo.ListWorkflows(ctx, client, "argo", "", otelConfig)

// List workflows with label
workflows, err := argo.ListWorkflows(ctx, client, "argo", "app=myapp", otelConfig)

func NewClient

func NewClient(ctx context.Context, config *Config) (context.Context, apiclient.Client, error)

NewClient creates a new Argo Workflows client from the given configuration. It returns the updated context and client, or an error if the connection fails.

The client can connect to Argo Workflows in two modes: 1. Via Kubernetes API (default) - uses kubeconfig or in-cluster config 2. Via Argo Server - uses HTTP/HTTPS connection

Example (default kubeconfig):

ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
if err != nil {
    return err
}
defer client.Close()

Example (in-cluster):

ctx, client, err := argo.NewClient(ctx, argo.InClusterConfig())

Example (Argo Server):

cfg := argo.ServerConfig("https://argo-server:2746", "Bearer token")
ctx, client, err := argo.NewClient(ctx, cfg)

func NewClientWithOptions

func NewClientWithOptions(ctx context.Context, opts ...Option) (context.Context, apiclient.Client, error)

NewClientWithOptions creates a new Argo Workflows client using functional options. This provides a more flexible way to configure the client.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/kubeconfig"),
    argo.WithContext("production"),
    argo.WithOTelConfig(otelConfig),
)

func SubmitAndWait added in v2.2.4

func SubmitAndWait(ctx context.Context, client apiclient.Client, wf *v1alpha1.Workflow, cfg *otel.Config, timeout time.Duration) (*v1alpha1.Workflow, error)

SubmitAndWait submits a workflow and waits for it to complete. It polls the workflow status at regular intervals and returns when the workflow reaches a terminal state (Succeeded, Failed, or Error).

Example:

wf, err := builder.NewWorkflowBuilder("backup", "argo").
    Add(backupStep).
    Build()
if err != nil {
    return err
}

completed, err := argo.SubmitAndWait(ctx, client, wf, otelConfig, 10*time.Minute)
if err != nil {
    return err
}
if completed.Status.Phase == v1alpha1.WorkflowSucceeded {
    fmt.Println("Workflow completed successfully")
}

func SubmitWorkflow added in v2.2.4

func SubmitWorkflow(ctx context.Context, client apiclient.Client, wf *v1alpha1.Workflow, cfg *otel.Config) (*v1alpha1.Workflow, error)

SubmitWorkflow submits a workflow to Argo with OpenTelemetry tracing. This is a convenience wrapper around the Argo API client with better error handling and automatic observability.

Example:

wf, err := builder.NewWorkflowBuilder("deploy", "argo").
    Add(deployStep).
    Build()
if err != nil {
    return err
}

created, err := argo.SubmitWorkflow(ctx, client, wf, otelConfig)
if err != nil {
    return err
}
fmt.Printf("Workflow %s submitted\n", created.Name)

Types

type Config

type Config struct {
	// KubeConfigPath specifies the path to kubeconfig file.
	// Empty string means:
	// - Use in-cluster config if InCluster is true
	// - Use default kubeconfig location (~/.kube/config) if InCluster is false
	KubeConfigPath string `yaml:"kubeConfigPath" mapstructure:"kubeConfigPath"`

	// Context specifies the kubeconfig context to use.
	// Empty string means use the current context.
	Context string `yaml:"context" mapstructure:"context"`

	// InCluster indicates whether to use in-cluster Kubernetes configuration.
	// When true, the client will use the service account token mounted in the pod.
	InCluster bool `yaml:"inCluster" mapstructure:"inCluster"`

	// ArgoServerOpts configures connection to Argo Server (alternative to direct k8s API).
	// If URL is set, the client will connect via Argo Server instead of k8s API.
	ArgoServerOpts ServerOpts `yaml:"argoServer" mapstructure:"argoServer"`

	// OTelConfig enables OpenTelemetry instrumentation (optional).
	OTelConfig *otel.Config `yaml:"-"`
}

Config represents the configuration for connecting to Argo Workflows. It supports multiple connection modes: - Kubernetes API (in-cluster or with kubeconfig) - Argo Server (HTTP/HTTPS with optional authentication)

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults. By default, it uses: - Out-of-cluster mode (InCluster = false) - Default kubeconfig location (~/.kube/config) - Current context from kubeconfig

func InClusterConfig

func InClusterConfig() *Config

InClusterConfig returns a Config for in-cluster usage. This is useful when the client runs inside a Kubernetes pod.

func ServerConfig added in v2.2.3

func ServerConfig(serverURL string, authToken string) *Config

ServerConfig returns a Config for connecting via Argo Server. This is an alternative to direct Kubernetes API access.

type Option

type Option func(*Config) error

Option is a functional option for configuring Argo client.

func WithArgoServer

func WithArgoServer(url, authToken string) Option

WithArgoServer configures the client to connect via Argo Server instead of Kubernetes API. This is an alternative connection mode that uses HTTP/HTTPS.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
)

func WithArgoServerHTTP1

func WithArgoServerHTTP1(http1 bool) Option

WithArgoServerHTTP1 forces HTTP/1.1 instead of HTTP/2 for Argo Server connection. This can be useful for debugging or compatibility reasons.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
    argo.WithArgoServerHTTP1(true),
)

func WithArgoServerInsecure

func WithArgoServerInsecure(insecure bool) Option

WithArgoServerInsecure enables insecure mode for Argo Server connection. This disables TLS certificate verification. WARNING: This should only be used in development/testing environments.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("http://argo-server:2746", ""),
    argo.WithArgoServerInsecure(true),
)

func WithArgoServerOpts

func WithArgoServerOpts(opts ServerOpts) Option

WithArgoServerOpts sets the complete ServerOpts configuration. This is useful when you want to configure all Argo Server options at once.

Example:

serverOpts := argo.ServerOpts{
    URL:                "https://argo-server:2746",
    AuthToken:          "Bearer token",
    InsecureSkipVerify: false,
    HTTP1:              false,
}

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServerOpts(serverOpts),
)

func WithConfig

func WithConfig(config *Config) Option

WithConfig applies a complete Config to the client. This is useful when you have a pre-built configuration from a config file.

Example:

config := &argo.Config{
    KubeConfigPath: "/custom/kubeconfig",
    Context:        "production",
}

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithConfig(config),
)

func WithContext

func WithContext(context string) Option

WithContext sets the kubeconfig context to use. If not set, the current context from kubeconfig will be used.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithContext("production"),
)

func WithInCluster

func WithInCluster(inCluster bool) Option

WithInCluster enables in-cluster configuration mode. When true, the client will use the service account token mounted in the pod. This is useful when running inside a Kubernetes cluster.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)

func WithKubeConfig

func WithKubeConfig(path string) Option

WithKubeConfig sets the path to the kubeconfig file. If not set, the default location (~/.kube/config) will be used.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/to/kubeconfig"),
)

func WithOTelConfig

func WithOTelConfig(otelConfig *otel.Config) Option

WithOTelConfig enables OpenTelemetry instrumentation for the Argo client. This allows tracing and monitoring of workflow operations.

Example:

otelConfig := otel.NewConfig("my-service").
    WithTracerProvider(tracerProvider).
    WithMeterProvider(meterProvider)

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithOTelConfig(otelConfig),
)

type ServerOpts added in v2.2.3

type ServerOpts struct {
	// URL is the Argo Server base URL (e.g., "https://argo-server:2746")
	URL string `yaml:"url" mapstructure:"url"`

	// AuthToken is the bearer token for authentication (optional)
	AuthToken string `yaml:"authToken" mapstructure:"authToken"`

	// InsecureSkipVerify disables TLS certificate verification (not recommended for production)
	InsecureSkipVerify bool `yaml:"insecureSkipVerify" mapstructure:"insecureSkipVerify"`

	// HTTP1 forces HTTP/1.1 instead of HTTP/2
	HTTP1 bool `yaml:"http1" mapstructure:"http1"`
}

ServerOpts contains Argo Server connection options. This is used when connecting via Argo Server HTTP API instead of direct Kubernetes API.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL