execution

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2025 License: Apache-2.0 Imports: 15 Imported by: 1

Documentation

Overview

Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions.

Index

Constants

View Source
const (
	StatePending         = "pending"
	StateRunning         = "running"
	StatePauseRequested  = "pauseRequested"
	StatePaused          = "paused"
	StateResumeRequested = "resumeRequested"
	StateCancelRequested = "cancelRequested"
	StateCancelled       = "cancelled"
	StateCompleted       = "completed"
	StateFailed          = "failed"
)

Process state constants

Variables

View Source
var ContextKey = KeyOf[*Context]()
View Source
var EventKey = KeyOf[*event.Service]()
View Source
var ExecutionKey = KeyOf[*Execution]()
View Source
var ProcessKey = KeyOf[*Process]()
View Source
var TaskKey = KeyOf[*graph.Task]()

Functions

func ContextValue

func ContextValue[T any](ctx context.Context) T

ContextValue returns the value of the provided type from the context

func KeyOf

func KeyOf[T any]() reflect.Type

KeyOf returns the reflect.Type of the provided type

Types

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context represents the execution context for a process

func NewContext

func NewContext(ctx context.Context, actions *extension.Actions, service *event.Service) *Context

func (*Context) ExecutionContext

func (c *Context) ExecutionContext(process *Process, execution *Execution, task *graph.Task) *Context

ExecutionContext returns context with provided process and execution

func (*Context) Value

func (c *Context) Value(key any) any

type Execution

type Execution struct {
	ID           string                 `json:"id"`
	ProcessID    string                 `json:"processId"`
	ParentTaskID string                 `json:"parentTaskId,omitempty"`
	GroupID      string                 `json:"groupId,omitempty"`
	TaskID       string                 `json:"taskId"`
	State        TaskState              `json:"state"`
	Data         map[string]interface{} `json:"data,omitempty"`
	Input        interface{}            `json:"input,omitempty"`
	Output       interface{}            `json:"empty,omitempty"`
	Error        string                 `json:"error,omitempty"`
	Attempts     int                    `json:"attempts,omitempty"`
	ScheduledAt  time.Time              `json:"scheduledAt"`
	StartedAt    *time.Time             `json:"startedAt,omitempty"`
	PausedAt     *time.Time             `json:"exectedAt,omitempty"`
	CompletedAt  *time.Time             `json:"completedAt,omitempty"`
	GoToTask     string                 `json:"gotoTask,omitempty"`
	Meta         map[string]interface{} `json:"meta,omitempty"`
	RunAfter     *time.Time             `json:"runAfter,omitempty"`
	DependsOn    []string               `json:"dependencies"`
	Dependencies map[string]TaskState   `json:"completed,omitempty"`
	Service      string                 `json:"service,omitempty"`
	Method       string                 `json:"method,omitempty"`
	AtHoc        bool                   `json:"atHoc,omitempty"`
	// CorrelationID is set when this execution is part of an asynchronous
	// fan-out group emitted by a parent task.  The value is shared across all
	// children of that group and by the parent waiting for their completion.
	CorrelationID string `json:"cid,omitempty"`

	Approved       *bool  `json:"approved,omitempty"`
	ApprovalReason string `json:"approvedDecision,omitempty"` // "yes" or "no"
	// contains filtered or unexported fields
}

Execution represents a single task execution

func NewAtHocExecution added in v0.1.7

func NewAtHocExecution(service, method string, input interface{}) (*Execution, error)

func NewExecution

func NewExecution(processID string, parent, task *graph.Task) *Execution

NewExecution creates a new execution for a task

func (*Execution) AtHocTask added in v0.1.7

func (e *Execution) AtHocTask() *graph.Task

func (*Execution) Clone

func (e *Execution) Clone() *Execution

Clone creates a deep copy of the execution so that the caller can mutate it without affecting the original instance. Only mutable collections are deep-copied; pointer fields referencing immutable data (Input / Output / Workflow structures) are left as-is.

func (*Execution) Complete

func (e *Execution) Complete()

Complete marks the execution as completed

func (*Execution) Context

func (e *Execution) Context(eventType string, task *graph.Task) *event.Context

func (*Execution) Fail

func (e *Execution) Fail(err error)

Fail marks the execution as failed

func (*Execution) IsWaitingAsync added in v0.1.24

func (e *Execution) IsWaitingAsync() bool

IsWaitingAsync reports whether the execution is currently waiting for asynchronous child tasks to complete.

func (*Execution) Merge

func (e *Execution) Merge(execution *Execution)

func (*Execution) Pause

func (e *Execution) Pause()

func (*Execution) Schedule

func (e *Execution) Schedule()

func (*Execution) Skip

func (e *Execution) Skip()

func (*Execution) Start

func (e *Execution) Start()

Start marks the execution as started

type Option

type Option func(session *Session)

func WithConverter

func WithConverter(converter *conv.Converter) Option

WithState sets the state for the session

func WithExecutionContext added in v0.1.21

func WithExecutionContext(context map[string]any) Option

func WithImports

func WithImports(imports ...*model.Import) Option

WithState sets the state for the session

func WithState

func WithState(state map[string]interface{}) Option

func WithStateListeners added in v0.1.2

func WithStateListeners(listeners ...StateListener) Option

WithStateListeners attaches immutable listeners to the created session. The slice is copied; callers can reuse their backing array.

func WithTypes

func WithTypes(types *extension.Types) Option

WithState sets the state for the session

type Process

type Process struct {
	ID         string            `json:"id"`
	ParentID   string            `json:"parentId,omitempty"`
	SCN        int               `json:"scn"`
	Name       string            `json:"name"`
	State      string            `json:"state"`
	Workflow   *model.Workflow   `json:"workflow"`
	CreatedAt  time.Time         `json:"createdAt"`
	UpdatedAt  time.Time         `json:"updatedAt"`
	FinishedAt *time.Time        `json:"finishedAt"`
	Session    *Session          `json:"session"`
	Stack      []*Execution      `json:"stack,omitempty"`
	Errors     map[string]string `json:"errors,omitempty"`
	Span       *tracing.Span     `json:"-"`
	// Cancel/Pause handling
	CancelReason string          `json:"cancelReason,omitempty"`
	Ctx          context.Context `json:"-"`

	PausedAt  *time.Time `json:"pausedAt,omitempty"`
	ResumedAt *time.Time `json:"resumedAt,omitempty"`
	Mode      string     `json:"mode"` //debug
	// For serverless environments
	ActiveTaskCount  int             `json:"activeTaskCount"`
	ActiveTaskGroups map[string]bool `json:"activeTaskGroups"`
	Policy           *policy.Config  `json:"policy,omitempty"`
	// contains filtered or unexported fields
}

Process represents a workflow execution instance

func NewProcess

func NewProcess(id string, name string, workflow *model.Workflow, initialState map[string]interface{}, execContext map[string]interface{}) *Process

func (*Process) AddActiveTaskGroup

func (p *Process) AddActiveTaskGroup(groupID string)

AddActiveTaskGroup marks a task group as active

func (*Process) AllTasks

func (p *Process) AllTasks() map[string]*graph.Task

func (*Process) CancelCtx added in v0.1.2

func (p *Process) CancelCtx()

CancelCtx cancels the internal context so that any in-flight task that is using it can abort early. Safe to call multiple times.

func (*Process) Clone

func (p *Process) Clone() *Process

Clone creates a deep copy of the Process suitable for safe concurrent reads/mutations outside the original store. The Workflow pointer is not cloned because workflows are immutable after initial load.

func (*Process) CopyFrom added in v0.1.2

func (p *Process) CopyFrom(src any)

CopyFrom updates exported, mutex-independent fields from src. It intentionally skips sync.Mutex as copying it would corrupt internal state.

func (*Process) DecrementActiveTaskCount

func (p *Process) DecrementActiveTaskCount() int

DecrementActiveTaskCount decrements the active task counter

func (*Process) DependencyState added in v0.1.3

func (p *Process) DependencyState(e *Execution, taskID string) (TaskState, bool)

getDep safely reads a dependency value; second return value indicates presence.

func (*Process) GetActiveTaskCount

func (p *Process) GetActiveTaskCount() int

GetActiveTaskCount returns the current active task count

func (*Process) GetState

func (p *Process) GetState() string

GetState returns the process state

func (*Process) HasActiveTaskGroup

func (p *Process) HasActiveTaskGroup(groupID string) bool

HasActiveTaskGroup checks if a task group is active

func (*Process) IncrementActiveTaskCount

func (p *Process) IncrementActiveTaskCount() int

IncrementActiveTaskCount increments the active task counter

func (*Process) LookupExecution

func (p *Process) LookupExecution(taskID string) *Execution

func (*Process) LookupTask

func (p *Process) LookupTask(taskID string) *graph.Task

func (*Process) Peek

func (p *Process) Peek() *Execution

func (*Process) Push

func (p *Process) Push(executions ...*Execution)

func (*Process) RegisterTask

func (p *Process) RegisterTask(t *graph.Task)

RegisterTask adds a task (and its subtasks) to the process' task lookup map at runtime. It is primarily used for template expansions that create tasks dynamically after the workflow has started executing.

func (*Process) Remove

func (p *Process) Remove(anExecution *Execution)

func (*Process) RemoveActiveTaskGroup

func (p *Process) RemoveActiveTaskGroup(groupID string)

RemoveActiveTaskGroup removes a task group from active groups

func (*Process) SetDependencyState added in v0.1.3

func (p *Process) SetDependencyState(e *Execution, taskID string, state TaskState)

setDep safely records taskID dependency state inside e.Dependencies.

func (*Process) SetState

func (p *Process) SetState(state string)

SetState updates the process state

type ProcessOutput

type ProcessOutput struct {
	ProcessID string
	State     string
	Output    map[string]interface{}
	Errors    map[string]string
	TimeTaken time.Duration
	Timeout   bool
}

type Session

type Session struct {
	ID      string
	State   map[string]interface{}
	Context map[string]interface{}
	// contains filtered or unexported fields
}

Session represents the execution context for a process

func NewSession

func NewSession(id string, opt ...Option) *Session

NewSession creates a new session

func (*Session) Append

func (s *Session) Append(key string, value interface{})

func (*Session) ApplyParameters

func (s *Session) ApplyParameters(params state.Parameters) error

ApplyParameters applies a list of parameters to the session

func (*Session) Clone

func (s *Session) Clone() *Session

Clone creates a copy of the session

func (*Session) Expand

func (s *Session) Expand(value interface{}) (interface{}, error)

Expand expands a value using the session state

func (*Session) FireWhen added in v0.1.2

func (s *Session) FireWhen(expr string, result bool)

FireWhen notifies all registered when-listeners. It is exported so that code outside the execution package (e.g. allocator) can emit the event.

func (*Session) Get

func (s *Session) Get(key string) (interface{}, bool)

Get retrieves a parameter from the session

func (*Session) GetAll

func (s *Session) GetAll() map[string]interface{}

GetAll returns all parameters in the session

func (*Session) GetBool

func (s *Session) GetBool(key string) (bool, bool)

GetBool retrieves a parameter as a boolean

func (*Session) GetInt

func (s *Session) GetInt(key string) (int, bool)

GetInt retrieves a parameter as an integer

func (*Session) GetString

func (s *Session) GetString(key string) (string, bool)

GetString retrieves a parameter as a string

func (*Session) RegisterListeners added in v0.1.2

func (s *Session) RegisterListeners(fn ...StateListener)

RegisterListeners attaches a callback that will be called on every Set. The call is made synchronously while the session mutex is held, therefore listeners MUST return quickly and must not call back into Session to avoid deadlocks.

func (*Session) RegisterWhenListeners added in v0.1.2

func (s *Session) RegisterWhenListeners(fn ...WhenListener)

RegisterWhenListeners attaches callbacks that are executed after every `when:` condition evaluation.

func (*Session) Set

func (s *Session) Set(key string, value interface{})

Set adds or updates a parameter in the session

func (*Session) TaskSession

func (s *Session) TaskSession(from map[string]interface{}, options ...Option) *Session

func (*Session) TypedValue

func (s *Session) TypedValue(aType reflect.Type, value interface{}) (interface{}, error)

TypedValue converts a value to the specified type

type StateListener added in v0.1.2

type StateListener func(s *Session, key string, oldVal, newVal interface{})

StateListener is invoked every time Session.Set overwrites an existing key or inserts a new one.

type TaskState

type TaskState string

TaskState represents the current State of a task

const (
	TaskStatePending             TaskState = "pending"
	TaskStateScheduled           TaskState = "scheduled"
	TaskStateRunning             TaskState = "running"
	TaskStateWaitForDependencies TaskState = "waitForDependencies" //waiting for dependency
	TaskStateWaitForSubTasks     TaskState = "waitForSubTasks"     //waiting for subtask
	// TaskStateWaitAsync indicates the task emitted asynchronous child
	// executions and is waiting until the rendez-vous condition is satisfied.
	TaskStateWaitAsync TaskState = "waitAsync"
	// TaskStateWaitForApproval indicates the task is waiting for explicit
	// approval before it can be executed.  Used by the optional policy/approval
	// mechanism.
	TaskStateWaitForApproval TaskState = "waitForApproval"
	TaskStateCompleted       TaskState = "completed"
	TaskStateFailed          TaskState = "failed"
	TaskStatePaused          TaskState = "paused"

	TaskStateSkipped TaskState = "skipped"
	// TaskStateCancelled indicates the execution has been cancelled.
	TaskStateCancelled TaskState = "cancelled"
)

func (TaskState) IsWaitForApproval

func (t TaskState) IsWaitForApproval() bool

type Wait

type Wait func(ctx context.Context, timeout time.Duration) (*ProcessOutput, error)

type WhenListener added in v0.1.2

type WhenListener func(s *Session, expr string, result bool)

WhenListener is invoked every time a `when:` expression is evaluated. The listener receives the session (at evaluation time), the raw expression and the boolean outcome of the evaluation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL