argo

package
v2.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: MIT Imports: 8 Imported by: 0

README

Argo Workflows Client

Go Version

Production-ready Argo Workflows client library with flexible configuration, OpenTelemetry support, and comprehensive error handling.

Features

  • Multiple Connection Modes: Kubernetes API, In-Cluster, or Argo Server HTTP
  • Flexible Configuration: Config structs and functional options
  • OpenTelemetry Integration: Built-in tracing and observability
  • Production-Ready: Proper error handling, no fatal errors
  • Type-Safe: Full Go type safety with generics support
  • Well-Documented: Comprehensive examples and documentation

Installation

go get github.com/jasoet/pkg/v2/argo

Quick Start

Basic Usage (Default Configuration)
package main

import (
    "context"
    "github.com/jasoet/pkg/v2/argo"
)

func main() {
    ctx := context.Background()

    // Create client using default kubeconfig (~/.kube/config)
    ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
    if err != nil {
        panic(err)
    }

    // Use the client
    wfClient := client.NewWorkflowServiceClient()
    // ... interact with workflows
}
Using Functional Options
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/kubeconfig"),
    argo.WithContext("production"),
)
if err != nil {
    return err
}

Connection Modes

1. Kubernetes API Mode (Default)

Uses kubeconfig file to connect to Kubernetes API server.

// Use default kubeconfig location (~/.kube/config)
ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())

// Or specify custom kubeconfig path
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("my-context"),
)
2. In-Cluster Mode

Use when running inside a Kubernetes pod.

ctx, client, err := argo.NewClient(ctx, argo.InClusterConfig())

// Or using functional options
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)
3. Argo Server Mode

Connect via Argo Server HTTP API.

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
)

// For development/testing with HTTP
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("http://argo-server:2746", ""),
    argo.WithArgoServerInsecure(true),
)

Configuration

Config Struct
type Config struct {
    // KubeConfigPath specifies the path to kubeconfig file
    KubeConfigPath string

    // Context specifies the kubeconfig context to use
    Context string

    // InCluster indicates whether to use in-cluster configuration
    InCluster bool

    // ArgoServerOpts configures connection to Argo Server
    ArgoServerOpts ArgoServerOpts

    // OTelConfig enables OpenTelemetry instrumentation
    OTelConfig *otel.Config
}
Pre-configured Factories
// Default configuration (uses ~/.kube/config)
config := argo.DefaultConfig()

// In-cluster configuration
config := argo.InClusterConfig()

// Argo Server configuration
config := argo.ArgoServerConfig("https://argo-server:2746", "Bearer token")

Functional Options

All available functional options:

ctx, client, err := argo.NewClientWithOptions(ctx,
    // Kubernetes API options
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("production"),
    argo.WithInCluster(true),

    // Argo Server options
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
    argo.WithArgoServerInsecure(false),
    argo.WithArgoServerHTTP1(false),

    // Observability
    argo.WithOTelConfig(otelConfig),
)

OpenTelemetry Integration

Enable distributed tracing and monitoring:

import (
    "github.com/jasoet/pkg/v2/argo"
    "github.com/jasoet/pkg/v2/otel"
)

// Create OTel config
otelConfig := otel.NewConfig("my-service").
    WithTracerProvider(tracerProvider).
    WithMeterProvider(meterProvider)

// Create Argo client with OTel
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithOTelConfig(otelConfig),
)

Working with Workflows

List Workflows
import (
    "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

wfClient := client.NewWorkflowServiceClient()

resp, err := wfClient.ListWorkflows(ctx, &workflow.WorkflowListRequest{
    Namespace: "argo",
    ListOptions: &metav1.ListOptions{
        Limit: 10,
    },
})
if err != nil {
    return err
}

for _, wf := range resp.Items {
    fmt.Printf("Workflow: %s, Status: %s\n", wf.Name, wf.Status.Phase)
}
Create a Workflow
import (
    "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

wf := &v1alpha1.Workflow{
    ObjectMeta: metav1.ObjectMeta{
        GenerateName: "hello-world-",
        Namespace:    "argo",
    },
    Spec: v1alpha1.WorkflowSpec{
        Entrypoint: "hello",
        Templates: []v1alpha1.Template{
            {
                Name: "hello",
                Container: &corev1.Container{
                    Image:   "alpine:latest",
                    Command: []string{"echo"},
                    Args:    []string{"Hello, Argo!"},
                },
            },
        },
    },
}

wfClient := client.NewWorkflowServiceClient()
created, err := wfClient.CreateWorkflow(ctx, &workflow.WorkflowCreateRequest{
    Namespace: "argo",
    Workflow:  wf,
})
if err != nil {
    return err
}

fmt.Printf("Created workflow: %s\n", created.Name)
Watch Workflows
import (
    "io"
)

wfClient := client.NewWorkflowServiceClient()

stream, err := wfClient.WatchWorkflows(ctx, &workflow.WatchWorkflowsRequest{
    Namespace: "argo",
    ListOptions: &metav1.ListOptions{
        Watch: true,
    },
})
if err != nil {
    return err
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    fmt.Printf("Workflow %s: %s\n", event.Object.Name, event.Object.Status.Phase)
}

Error Handling

The library uses proper error handling without fatal errors:

ctx, client, err := argo.NewClient(ctx, config)
if err != nil {
    // Handle error gracefully
    log.Error().Err(err).Msg("Failed to create Argo client")
    return err
}

// Always check errors from operations
wfClient := client.NewWorkflowServiceClient()
resp, err := wfClient.ListWorkflows(ctx, req)
if err != nil {
    log.Error().Err(err).Msg("Failed to list workflows")
    return err
}

Examples

Example 1: Default Configuration
ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
if err != nil {
    return err
}
Example 2: Custom Kubeconfig and Context
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/etc/kubernetes/admin.conf"),
    argo.WithContext("prod-cluster"),
)
Example 3: In-Cluster Usage
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)
Example 4: Argo Server with Authentication
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server.example.com", "Bearer my-token"),
)
Example 5: With OpenTelemetry
otelConfig := otel.NewConfig("workflow-manager")
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/etc/kubernetes/admin.conf"),
    argo.WithOTelConfig(otelConfig),
)

Running Examples

# Run the comprehensive example
go run -tags=example ./argo/examples

# Or build and run
go build -tags=example -o argo-example ./argo/examples
./argo-example

See examples/README.md for more details.

Comparison with Original Implementation

Before (scp/api)
// util/argo/argo.go - tightly coupled, uses fatal errors
func NewClient(ctx context.Context) (context.Context, apiclient.Client) {
    ctx, argoClient, err := apiclient.NewClientFromOpts(
        apiclient.Opts{
            ArgoServerOpts:       apiclient.ArgoServerOpts{},
            ClientConfigSupplier: kube.GetCmdConfig,
            Context:              ctx,
        })
    if err != nil {
        log.Fatal().Err(err).Msg("unable to create argo client")  // Fatal!
    }
    return ctx, argoClient
}
After (pkg/v2/argo)
// Flexible, reusable, proper error handling
ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/path/to/kubeconfig"),
    argo.WithContext("production"),
    argo.WithOTelConfig(otelConfig),
)
if err != nil {
    return fmt.Errorf("failed to create client: %w", err)  // Graceful!
}
defer client.Close()

Benefits

Reusable - Can be used across multiple projects ✅ Flexible - Config struct + functional options ✅ Library-friendly - Returns errors instead of fatal ✅ Testable - Easy to mock and test ✅ Observable - OpenTelemetry integration ready ✅ Well-documented - Comprehensive docs and examples ✅ Production-ready - Proper error handling and logging

Best Practices

  1. Use context for lifecycle management

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Clean up context when done
    
    ctx, client, err := argo.NewClient(ctx, config)
    if err != nil {
        return err
    }
    
  2. Use functional options for flexibility

    ctx, client, err := argo.NewClientWithOptions(ctx,
        argo.WithKubeConfig(kubeconfigPath),
        argo.WithContext(contextName),
    )
    
  3. Enable OpenTelemetry in production

    ctx, client, err := argo.NewClientWithOptions(ctx,
        argo.WithOTelConfig(otelConfig),
    )
    
  4. Handle errors gracefully

    if err != nil {
        log.Error().Err(err).Msg("Operation failed")
        return fmt.Errorf("operation failed: %w", err)
    }
    

Testing

# Unit tests
go test ./argo

# Integration tests (requires Kubernetes cluster)
go test -tags=integration ./argo

# All tests with coverage
go test -cover ./argo

Contributing

Contributions are welcome! Please see the main CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.


⬆ Back to Top

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewClient

func NewClient(ctx context.Context, config *Config) (context.Context, apiclient.Client, error)

NewClient creates a new Argo Workflows client from the given configuration. It returns the updated context and client, or an error if the connection fails.

The client can connect to Argo Workflows in two modes: 1. Via Kubernetes API (default) - uses kubeconfig or in-cluster config 2. Via Argo Server - uses HTTP/HTTPS connection

Example (default kubeconfig):

ctx, client, err := argo.NewClient(ctx, argo.DefaultConfig())
if err != nil {
    return err
}
defer client.Close()

Example (in-cluster):

ctx, client, err := argo.NewClient(ctx, argo.InClusterConfig())

Example (Argo Server):

cfg := argo.ArgoServerConfig("https://argo-server:2746", "Bearer token")
ctx, client, err := argo.NewClient(ctx, cfg)

func NewClientWithOptions

func NewClientWithOptions(ctx context.Context, opts ...Option) (context.Context, apiclient.Client, error)

NewClientWithOptions creates a new Argo Workflows client using functional options. This provides a more flexible way to configure the client.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/kubeconfig"),
    argo.WithContext("production"),
    argo.WithOTelConfig(otelConfig),
)

Types

type ArgoServerOpts

type ArgoServerOpts struct {
	// URL is the Argo Server base URL (e.g., "https://argo-server:2746")
	URL string `yaml:"url" mapstructure:"url"`

	// AuthToken is the bearer token for authentication (optional)
	AuthToken string `yaml:"authToken" mapstructure:"authToken"`

	// InsecureSkipVerify disables TLS certificate verification (not recommended for production)
	InsecureSkipVerify bool `yaml:"insecureSkipVerify" mapstructure:"insecureSkipVerify"`

	// HTTP1 forces HTTP/1.1 instead of HTTP/2
	HTTP1 bool `yaml:"http1" mapstructure:"http1"`
}

ArgoServerOpts contains Argo Server connection options. This is used when connecting via Argo Server HTTP API instead of direct Kubernetes API.

type Config

type Config struct {
	// KubeConfigPath specifies the path to kubeconfig file.
	// Empty string means:
	// - Use in-cluster config if InCluster is true
	// - Use default kubeconfig location (~/.kube/config) if InCluster is false
	KubeConfigPath string `yaml:"kubeConfigPath" mapstructure:"kubeConfigPath"`

	// Context specifies the kubeconfig context to use.
	// Empty string means use the current context.
	Context string `yaml:"context" mapstructure:"context"`

	// InCluster indicates whether to use in-cluster Kubernetes configuration.
	// When true, the client will use the service account token mounted in the pod.
	InCluster bool `yaml:"inCluster" mapstructure:"inCluster"`

	// ArgoServerOpts configures connection to Argo Server (alternative to direct k8s API).
	// If URL is set, the client will connect via Argo Server instead of k8s API.
	ArgoServerOpts ArgoServerOpts `yaml:"argoServer" mapstructure:"argoServer"`

	// OTelConfig enables OpenTelemetry instrumentation (optional).
	OTelConfig *otel.Config `yaml:"-"`
}

Config represents the configuration for connecting to Argo Workflows. It supports multiple connection modes: - Kubernetes API (in-cluster or with kubeconfig) - Argo Server (HTTP/HTTPS with optional authentication)

func ArgoServerConfig

func ArgoServerConfig(serverURL string, authToken string) *Config

ArgoServerConfig returns a Config for connecting via Argo Server. This is an alternative to direct Kubernetes API access.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults. By default, it uses: - Out-of-cluster mode (InCluster = false) - Default kubeconfig location (~/.kube/config) - Current context from kubeconfig

func InClusterConfig

func InClusterConfig() *Config

InClusterConfig returns a Config for in-cluster usage. This is useful when the client runs inside a Kubernetes pod.

type Option

type Option func(*Config) error

Option is a functional option for configuring Argo client.

func WithArgoServer

func WithArgoServer(url, authToken string) Option

WithArgoServer configures the client to connect via Argo Server instead of Kubernetes API. This is an alternative connection mode that uses HTTP/HTTPS.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
)

func WithArgoServerHTTP1

func WithArgoServerHTTP1(http1 bool) Option

WithArgoServerHTTP1 forces HTTP/1.1 instead of HTTP/2 for Argo Server connection. This can be useful for debugging or compatibility reasons.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("https://argo-server:2746", "Bearer token"),
    argo.WithArgoServerHTTP1(true),
)

func WithArgoServerInsecure

func WithArgoServerInsecure(insecure bool) Option

WithArgoServerInsecure enables insecure mode for Argo Server connection. This disables TLS certificate verification. WARNING: This should only be used in development/testing environments.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServer("http://argo-server:2746", ""),
    argo.WithArgoServerInsecure(true),
)

func WithArgoServerOpts

func WithArgoServerOpts(opts ArgoServerOpts) Option

WithArgoServerOpts sets the complete ArgoServerOpts configuration. This is useful when you want to configure all Argo Server options at once.

Example:

serverOpts := argo.ArgoServerOpts{
    URL:                "https://argo-server:2746",
    AuthToken:          "Bearer token",
    InsecureSkipVerify: false,
    HTTP1:              false,
}

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithArgoServerOpts(serverOpts),
)

func WithConfig

func WithConfig(config *Config) Option

WithConfig applies a complete Config to the client. This is useful when you have a pre-built configuration from a config file.

Example:

config := &argo.Config{
    KubeConfigPath: "/custom/kubeconfig",
    Context:        "production",
}

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithConfig(config),
)

func WithContext

func WithContext(context string) Option

WithContext sets the kubeconfig context to use. If not set, the current context from kubeconfig will be used.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithContext("production"),
)

func WithInCluster

func WithInCluster(inCluster bool) Option

WithInCluster enables in-cluster configuration mode. When true, the client will use the service account token mounted in the pod. This is useful when running inside a Kubernetes cluster.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithInCluster(true),
)

func WithKubeConfig

func WithKubeConfig(path string) Option

WithKubeConfig sets the path to the kubeconfig file. If not set, the default location (~/.kube/config) will be used.

Example:

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithKubeConfig("/custom/path/to/kubeconfig"),
)

func WithOTelConfig

func WithOTelConfig(otelConfig *otel.Config) Option

WithOTelConfig enables OpenTelemetry instrumentation for the Argo client. This allows tracing and monitoring of workflow operations.

Example:

otelConfig := otel.NewConfig("my-service").
    WithTracerProvider(tracerProvider).
    WithMeterProvider(meterProvider)

ctx, client, err := argo.NewClientWithOptions(ctx,
    argo.WithOTelConfig(otelConfig),
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL