execution

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2025 License: Apache-2.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

View Source
const (
	StatePending   = "pending"
	StateRunning   = "running"
	StatePaused    = "paused"
	StateCompleted = "completed"
	StateFailed    = "failed"
)

Process state constants

Variables

View Source
var ContextKey = KeyOf[*Context]()
View Source
var EventKey = KeyOf[*event.Service]()
View Source
var ExecutionKey = KeyOf[*Execution]()
View Source
var ProcessKey = KeyOf[*Process]()
View Source
var TaskKey = KeyOf[*graph.Task]()

Functions

func ContextValue

func ContextValue[T any](ctx context.Context) T

ContextValue returns the value of the provided type from the context

func KeyOf

func KeyOf[T any]() reflect.Type

KeyOf returns the reflect.Type of the provided type

Types

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context represents the execution context for a process

func NewContext

func NewContext(ctx context.Context, actions *extension.Actions, service *event.Service) *Context

func (*Context) ExecutionContext

func (c *Context) ExecutionContext(process *Process, execution *Execution, task *graph.Task) *Context

ExecutionContext returns context with provided process and execution

func (*Context) Value

func (c *Context) Value(key any) any

type Execution

type Execution struct {
	ID             string                 `json:"id"`
	ProcessID      string                 `json:"processId"`
	ParentTaskID   string                 `json:"parentTaskId,omitempty"`
	GroupID        string                 `json:"groupId,omitempty"`
	TaskID         string                 `json:"taskId"`
	State          TaskState              `json:"state"`
	Data           map[string]interface{} `json:"data,omitempty"`
	Input          interface{}            `json:"input,omitempty"`
	Output         interface{}            `json:"empty,omitempty"`
	Error          string                 `json:"error,omitempty"`
	Attempts       int                    `json:"attempts,omitempty"`
	ScheduledAt    time.Time              `json:"scheduledAt"`
	StartedAt      *time.Time             `json:"startedAt,omitempty"`
	PausedAt       *time.Time             `json:"exectedAt,omitempty"`
	CompletedAt    *time.Time             `json:"completedAt,omitempty"`
	GoToTask       string                 `json:"gotoTask,omitempty"`
	Meta           map[string]interface{} `json:"meta,omitempty"`
	RunAfter       *time.Time             `json:"runAfter,omitempty"`
	DependsOn      []string               `json:"dependencies"`
	Dependencies   map[string]TaskState   `json:"completed,omitempty"`
	Approved       *bool                  `json:"approved,omitempty"`
	ApprovalReason string                 `json:"approvedDecision,omitempty"` // "yes" or "no"
}

Execution represents a single task execution

func NewExecution

func NewExecution(processID string, parent, task *graph.Task) *Execution

NewExecution creates a new execution for a task

func (*Execution) Clone

func (e *Execution) Clone() *Execution

Clone creates a deep copy of the execution so that the caller can mutate it without affecting the original instance. Only mutable collections are deep-copied; pointer fields referencing immutable data (Input / Output / Workflow structures) are left as-is.

func (*Execution) Complete

func (e *Execution) Complete()

Complete marks the execution as completed

func (*Execution) Context

func (e *Execution) Context(eventType string, task *graph.Task) *event.Context

func (*Execution) Fail

func (e *Execution) Fail(err error)

Fail marks the execution as failed

func (*Execution) Merge

func (e *Execution) Merge(execution *Execution)

func (*Execution) Pause

func (e *Execution) Pause()

func (*Execution) Schedule

func (e *Execution) Schedule()

func (*Execution) Skip

func (e *Execution) Skip()

func (*Execution) Start

func (e *Execution) Start()

Start marks the execution as started

type Option

type Option func(session *Session)

func WithConverter

func WithConverter(converter *conv.Converter) Option

WithState sets the state for the session

func WithImports

func WithImports(imports ...*model.Import) Option

WithState sets the state for the session

func WithState

func WithState(state map[string]interface{}) Option

func WithTypes

func WithTypes(types *extension.Types) Option

WithState sets the state for the session

type Process

type Process struct {
	ID         string            `json:"id"`
	ParentID   string            `json:"parentId,omitempty"`
	SCN        int               `json:"scn"`
	Name       string            `json:"name"`
	State      string            `json:"state"`
	Workflow   *model.Workflow   `json:"workflow"`
	CreatedAt  time.Time         `json:"createdAt"`
	UpdatedAt  time.Time         `json:"updatedAt"`
	FinishedAt *time.Time        `json:"finishedAt"`
	Session    *Session          `json:"session"`
	Stack      []*Execution      `json:"stack,omitempty"`
	Errors     map[string]string `json:"errors,omitempty"`
	Span       *tracing.Span     `json:"-"`
	Mode       string            `json:"mode"` //debug
	// For serverless environments
	ActiveTaskCount  int             `json:"activeTaskCount"`
	ActiveTaskGroups map[string]bool `json:"activeTaskGroups"`
	Policy           *policy.Config  `json:"policy,omitempty"`
	// contains filtered or unexported fields
}

Process represents a workflow execution instance

func NewProcess

func NewProcess(id string, name string, workflow *model.Workflow, initialState map[string]interface{}) *Process

NewProcess creates a new process

func (*Process) AddActiveTaskGroup

func (p *Process) AddActiveTaskGroup(groupID string)

AddActiveTaskGroup marks a task group as active

func (*Process) AllTasks

func (p *Process) AllTasks() map[string]*graph.Task

func (*Process) Clone

func (p *Process) Clone() *Process

Clone creates a deep copy of the Process suitable for safe concurrent reads/mutations outside the original store. The Workflow pointer is not cloned because workflows are immutable after initial load.

func (*Process) DecrementActiveTaskCount

func (p *Process) DecrementActiveTaskCount() int

DecrementActiveTaskCount decrements the active task counter

func (*Process) GetActiveTaskCount

func (p *Process) GetActiveTaskCount() int

GetActiveTaskCount returns the current active task count

func (*Process) GetState

func (p *Process) GetState() string

GetState returns the process state

func (*Process) HasActiveTaskGroup

func (p *Process) HasActiveTaskGroup(groupID string) bool

HasActiveTaskGroup checks if a task group is active

func (*Process) IncrementActiveTaskCount

func (p *Process) IncrementActiveTaskCount() int

IncrementActiveTaskCount increments the active task counter

func (*Process) LookupExecution

func (p *Process) LookupExecution(taskID string) *Execution

func (*Process) LookupTask

func (p *Process) LookupTask(taskID string) *graph.Task

func (*Process) Peek

func (p *Process) Peek() *Execution

func (*Process) Push

func (p *Process) Push(executions ...*Execution)

func (*Process) RegisterTask

func (p *Process) RegisterTask(t *graph.Task)

RegisterTask adds a task (and its subtasks) to the process' task lookup map at runtime. It is primarily used for template expansions that create tasks dynamically after the workflow has started executing.

func (*Process) Remove

func (p *Process) Remove(anExecution *Execution)

func (*Process) RemoveActiveTaskGroup

func (p *Process) RemoveActiveTaskGroup(groupID string)

RemoveActiveTaskGroup removes a task group from active groups

func (*Process) SetState

func (p *Process) SetState(state string)

SetState updates the process state

type ProcessOutput

type ProcessOutput struct {
	ProcessID string
	State     string
	Output    map[string]interface{}
	Errors    map[string]string
	TimeTaken time.Duration
	Timeout   bool
}

type Session

type Session struct {
	ID    string
	State map[string]interface{}
	// contains filtered or unexported fields
}

Session represents the execution context for a process

func NewSession

func NewSession(id string, opt ...Option) *Session

NewSession creates a new session

func (*Session) Append

func (s *Session) Append(key string, value interface{})

func (*Session) ApplyParameters

func (s *Session) ApplyParameters(params state.Parameters) error

ApplyParameters applies a list of parameters to the session

func (*Session) Clone

func (s *Session) Clone() *Session

Clone creates a copy of the session

func (*Session) Expand

func (s *Session) Expand(value interface{}) (interface{}, error)

Expand expands a value using the session state

func (*Session) Get

func (s *Session) Get(key string) (interface{}, bool)

Get retrieves a parameter from the session

func (*Session) GetAll

func (s *Session) GetAll() map[string]interface{}

GetAll returns all parameters in the session

func (*Session) GetBool

func (s *Session) GetBool(key string) (bool, bool)

GetBool retrieves a parameter as a boolean

func (*Session) GetInt

func (s *Session) GetInt(key string) (int, bool)

GetInt retrieves a parameter as an integer

func (*Session) GetString

func (s *Session) GetString(key string) (string, bool)

GetString retrieves a parameter as a string

func (*Session) Set

func (s *Session) Set(key string, value interface{})

Set adds or updates a parameter in the session

func (*Session) TaskSession

func (s *Session) TaskSession(from map[string]interface{}, options ...Option) *Session

func (*Session) TypedValue

func (s *Session) TypedValue(aType reflect.Type, value interface{}) (interface{}, error)

TypedValue converts a value to the specified type

type TaskState

type TaskState string

TaskState represents the current State of a task

const (
	TaskStatePending             TaskState = "pending"
	TaskStateScheduled           TaskState = "scheduled"
	TaskStateRunning             TaskState = "running"
	TaskStateWaitForDependencies TaskState = "waitForDependencies" //waiting for dependency
	TaskStateWaitForSubTasks     TaskState = "waitForSubTasks"     //waiting for subtask
	// TaskStateWaitForApproval indicates the task is waiting for explicit
	// approval before it can be executed.  Used by the optional policy/approval
	// mechanism.
	TaskStateWaitForApproval TaskState = "waitForApproval"
	TaskStateCompleted       TaskState = "completed"
	TaskStateFailed          TaskState = "failed"
	TaskStatePaused          TaskState = "paused"

	TaskStateSkipped TaskState = "skipped"
)

func (TaskState) IsWaitForApproval

func (t TaskState) IsWaitForApproval() bool

type Wait

type Wait func(ctx context.Context, timeout time.Duration) (*ProcessOutput, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL