workflow

package
v1.16.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var HistoryCmd = &cobra.Command{
	Use:   "history",
	Short: "Get the history of a workflow instance.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.HistoryOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     args[0],
		}

		var list any
		if *historyOutputFormat == outputFormatShort {
			list, err = workflow.HistoryShort(ctx, opts)
		} else {
			list, err = workflow.HistoryWide(ctx, opts)
		}
		if err != nil {
			return err
		}

		switch *historyOutputFormat {
		case outputFormatYAML:
			err = utils.PrintDetail(os.Stdout, "yaml", list)
		case outputFormatJSON:
			err = utils.PrintDetail(os.Stdout, "json", list)
		default:
			var table string
			table, err = gocsv.MarshalString(list)
			if err != nil {
				break
			}

			utils.PrintTable(table)
		}
		if err != nil {
			return err
		}

		return nil
	},
}
View Source
var ListCmd = &cobra.Command{
	Use:     "list",
	Aliases: []string{"ls"},
	Short:   "List workflows for the given app ID.",
	Args:    cobra.NoArgs,
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.ListOptions{
			KubernetesMode:   flagKubernetesMode,
			Namespace:        flagDaprNamespace,
			AppID:            appID,
			ConnectionString: listConn.connectionString,
			TableName:        listConn.tableName,
			Filter:           *listFilter,
		}

		var list any
		var empty bool

		switch *listOutputFormat {
		case outputFormatShort:
			var ll []*workflow.ListOutputShort
			ll, err = workflow.ListShort(ctx, opts)
			if err != nil {
				return err
			}
			empty = len(ll) == 0
			list = ll

		default:
			var ll []*workflow.ListOutputWide
			ll, err = workflow.ListWide(ctx, opts)
			if err != nil {
				return err
			}
			empty = len(ll) == 0
			list = ll
		}

		if empty {
			print.FailureStatusEvent(os.Stderr, "No workflow found in namespace %q for app ID %q", flagDaprNamespace, appID)
			return nil
		}

		switch *listOutputFormat {
		case outputFormatYAML:
			err = utils.PrintDetail(os.Stdout, "yaml", list)
		case outputFormatJSON:
			err = utils.PrintDetail(os.Stdout, "json", list)
		default:
			var table string
			table, err = gocsv.MarshalString(list)
			if err != nil {
				break
			}

			utils.PrintTable(table)
		}

		if err != nil {
			return err
		}

		return nil
	},
}
View Source
var PurgeCmd = &cobra.Command{
	Use:   "purge",
	Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.",
	Args: func(cmd *cobra.Command, args []string) error {
		switch {
		case cmd.Flags().Changed("all-older-than"),
			cmd.Flags().Changed("all"):
			if len(args) > 0 {
				return errors.New("no arguments are accepted when using purge all flags")
			}
		default:
			if len(args) == 0 {
				return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
			}
		}

		return nil
	},
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.PurgeOptions{
			KubernetesMode:     flagKubernetesMode,
			Namespace:          flagDaprNamespace,
			SchedulerNamespace: schedulerNamespace,
			AppID:              appID,
			InstanceIDs:        args,
			All:                flagPurgeAll,
			ConnectionString:   flagPurgeConn.connectionString,
			TableName:          flagPurgeConn.tableName,
		}

		if cmd.Flags().Changed("all-older-than") {
			opts.AllOlderThan, err = parseWorkflowDurationTimestamp(flagPurgeOlderThan, true)
			if err != nil {
				return err
			}
		}

		return workflow.Purge(ctx, opts)
	},
}
View Source
var RaiseEventCmd = &cobra.Command{
	Use:   "raise-event",
	Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		split := strings.Split(args[0], "/")
		if len(split) != 2 {
			return errors.New("the argument must be in the format '<instance-id>/<event-name>'")
		}
		instanceID := split[0]
		eventName := split[1]

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.RaiseEventOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     instanceID,
			Name:           eventName,
			Input:          flagRaiseEventInput.input,
		}

		if err = workflow.RaiseEvent(ctx, opts); err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Workflow '%s' raised event '%s' successfully", instanceID, eventName)

		return nil
	},
}
View Source
var ReRunCmd = &cobra.Command{
	Use:   "rerun [instance ID]",
	Short: "ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.ReRunOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     args[0],
			Input:          flagReRunInput.input,
			EventID:        flagReRunEventID,
		}

		if cmd.Flags().Changed("new-instance-id") {
			opts.NewInstanceID = ptr.Of(flagReRunNewInstanceID)
		}

		id, err := workflow.ReRun(ctx, opts)
		if err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Rerunning workflow instance: %s", id)

		return nil
	},
}
View Source
var ResumeCmd = &cobra.Command{
	Use:   "resume",
	Short: "Resume a workflow that is suspended.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.ResumeOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     args[0],
			Reason:         flagResumeReason,
		}

		if err = workflow.Resume(ctx, opts); err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Workflow '%s' resumed successfully", args[0])

		return nil
	},
}
View Source
var RunCmd = &cobra.Command{
	Use:   "run",
	Short: "Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.RunOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			Name:           args[0],
			InstanceID:     flagRunInstanceID.instanceID,
			Input:          flagRunInput.input,
		}

		if cmd.Flags().Changed("start-time") {
			opts.StartTime, err = parseWorkflowDurationTimestamp(flagRunStartTime, false)
			if err != nil {
				return err
			}
		}

		id, err := workflow.Run(ctx, opts)
		if err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Workflow instance started successfully: %s", id)

		return nil
	},
}
View Source
var SuspendCmd = &cobra.Command{
	Use:   "suspend",
	Short: "Suspend a workflow in progress.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		opts := workflow.SuspendOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     args[0],
			Reason:         flagSuspendReason,
		}

		if err = workflow.Suspend(ctx, opts); err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Workflow '%s' suspended successfully", args[0])

		return nil
	},
}
View Source
var TerminateCmd = &cobra.Command{
	Use:   "terminate",
	Short: "Terminate a workflow in progress.",
	Args:  cobra.ExactArgs(1),
	RunE: func(cmd *cobra.Command, args []string) error {
		ctx := signals.Context()

		appID, err := getWorkflowAppID(cmd)
		if err != nil {
			return err
		}

		var output *string
		if cmd.Flags().Changed("output") {
			output = &flagTerminateOutput
		}

		opts := workflow.TerminateOptions{
			KubernetesMode: flagKubernetesMode,
			Namespace:      flagDaprNamespace,
			AppID:          appID,
			InstanceID:     args[0],
			Output:         output,
		}

		if err = workflow.Terminate(ctx, opts); err != nil {
			print.FailureStatusEvent(os.Stdout, err.Error())
			os.Exit(1)
		}

		print.InfoStatusEvent(os.Stdout, "Workflow '%s' terminated successfully", args[0])

		return nil
	},
}
View Source
var WorkflowCmd = &cobra.Command{
	Use:     "workflow",
	Short:   "Workflow management commands. Use -k to target a Kubernetes Dapr cluster.",
	Aliases: []string{"wf"},
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL