Documentation
¶
Index ¶
- Constants
- Variables
- func ContextValue[T any](ctx context.Context) T
- func KeyOf[T any]() reflect.Type
- type Context
- type Execution
- func (e *Execution) Clone() *Execution
- func (e *Execution) Complete()
- func (e *Execution) Context(eventType string, task *graph.Task) *event.Context
- func (e *Execution) Fail(err error)
- func (e *Execution) Merge(execution *Execution)
- func (e *Execution) Pause()
- func (e *Execution) Schedule()
- func (e *Execution) Skip()
- func (e *Execution) Start()
- type Option
- type Process
- func (p *Process) AddActiveTaskGroup(groupID string)
- func (p *Process) AllTasks() map[string]*graph.Task
- func (p *Process) Clone() *Process
- func (p *Process) DecrementActiveTaskCount() int
- func (p *Process) GetActiveTaskCount() int
- func (p *Process) GetState() string
- func (p *Process) HasActiveTaskGroup(groupID string) bool
- func (p *Process) IncrementActiveTaskCount() int
- func (p *Process) LookupExecution(taskID string) *Execution
- func (p *Process) LookupTask(taskID string) *graph.Task
- func (p *Process) Peek() *Execution
- func (p *Process) Push(executions ...*Execution)
- func (p *Process) RegisterTask(t *graph.Task)
- func (p *Process) Remove(anExecution *Execution)
- func (p *Process) RemoveActiveTaskGroup(groupID string)
- func (p *Process) SetState(state string)
- type ProcessOutput
- type Session
- func (s *Session) Append(key string, value interface{})
- func (s *Session) ApplyParameters(params state.Parameters) error
- func (s *Session) Clone() *Session
- func (s *Session) Expand(value interface{}) (interface{}, error)
- func (s *Session) Get(key string) (interface{}, bool)
- func (s *Session) GetAll() map[string]interface{}
- func (s *Session) GetBool(key string) (bool, bool)
- func (s *Session) GetInt(key string) (int, bool)
- func (s *Session) GetString(key string) (string, bool)
- func (s *Session) Set(key string, value interface{})
- func (s *Session) TaskSession(from map[string]interface{}, options ...Option) *Session
- func (s *Session) TypedValue(aType reflect.Type, value interface{}) (interface{}, error)
- type TaskState
- type Wait
Constants ¶
const ( StatePending = "pending" StateRunning = "running" StatePaused = "paused" StateCompleted = "completed" StateFailed = "failed" )
Process state constants
Variables ¶
var ContextKey = KeyOf[*Context]()
var EventKey = KeyOf[*event.Service]()
var ExecutionKey = KeyOf[*Execution]()
var ProcessKey = KeyOf[*Process]()
var TaskKey = KeyOf[*graph.Task]()
Functions ¶
func ContextValue ¶
ContextValue returns the value of the provided type from the context
Types ¶
type Context ¶
Context represents the execution context for a process
func NewContext ¶
type Execution ¶
type Execution struct {
ID string `json:"id"`
ProcessID string `json:"processId"`
ParentTaskID string `json:"parentTaskId,omitempty"`
GroupID string `json:"groupId,omitempty"`
TaskID string `json:"taskId"`
State TaskState `json:"state"`
Data map[string]interface{} `json:"data,omitempty"`
Input interface{} `json:"input,omitempty"`
Output interface{} `json:"empty,omitempty"`
Error string `json:"error,omitempty"`
Attempts int `json:"attempts,omitempty"`
ScheduledAt time.Time `json:"scheduledAt"`
StartedAt *time.Time `json:"startedAt,omitempty"`
PausedAt *time.Time `json:"exectedAt,omitempty"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
GoToTask string `json:"gotoTask,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
RunAfter *time.Time `json:"runAfter,omitempty"`
DependsOn []string `json:"dependencies"`
Dependencies map[string]TaskState `json:"completed,omitempty"`
Approved *bool `json:"approved,omitempty"`
ApprovalReason string `json:"approvedDecision,omitempty"` // "yes" or "no"
}
Execution represents a single task execution
func NewExecution ¶
NewExecution creates a new execution for a task
func (*Execution) Clone ¶
Clone creates a deep copy of the execution so that the caller can mutate it without affecting the original instance. Only mutable collections are deep-copied; pointer fields referencing immutable data (Input / Output / Workflow structures) are left as-is.
func (*Execution) Complete ¶
func (e *Execution) Complete()
Complete marks the execution as completed
type Option ¶
type Option func(session *Session)
func WithConverter ¶
WithState sets the state for the session
func WithImports ¶
WithState sets the state for the session
type Process ¶
type Process struct {
ID string `json:"id"`
ParentID string `json:"parentId,omitempty"`
SCN int `json:"scn"`
Name string `json:"name"`
State string `json:"state"`
Workflow *model.Workflow `json:"workflow"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
FinishedAt *time.Time `json:"finishedAt"`
Session *Session `json:"session"`
Stack []*Execution `json:"stack,omitempty"`
Errors map[string]string `json:"errors,omitempty"`
Span *tracing.Span `json:"-"`
Mode string `json:"mode"` //debug
// For serverless environments
ActiveTaskCount int `json:"activeTaskCount"`
ActiveTaskGroups map[string]bool `json:"activeTaskGroups"`
Policy *policy.Config `json:"policy,omitempty"`
// contains filtered or unexported fields
}
Process represents a workflow execution instance
func NewProcess ¶
func NewProcess(id string, name string, workflow *model.Workflow, initialState map[string]interface{}) *Process
NewProcess creates a new process
func (*Process) AddActiveTaskGroup ¶
AddActiveTaskGroup marks a task group as active
func (*Process) Clone ¶
Clone creates a deep copy of the Process suitable for safe concurrent reads/mutations outside the original store. The Workflow pointer is not cloned because workflows are immutable after initial load.
func (*Process) DecrementActiveTaskCount ¶
DecrementActiveTaskCount decrements the active task counter
func (*Process) GetActiveTaskCount ¶
GetActiveTaskCount returns the current active task count
func (*Process) HasActiveTaskGroup ¶
HasActiveTaskGroup checks if a task group is active
func (*Process) IncrementActiveTaskCount ¶
IncrementActiveTaskCount increments the active task counter
func (*Process) LookupExecution ¶
func (*Process) RegisterTask ¶
RegisterTask adds a task (and its subtasks) to the process' task lookup map at runtime. It is primarily used for template expansions that create tasks dynamically after the workflow has started executing.
func (*Process) RemoveActiveTaskGroup ¶
RemoveActiveTaskGroup removes a task group from active groups
type ProcessOutput ¶
type Session ¶
type Session struct {
ID string
State map[string]interface{}
// contains filtered or unexported fields
}
Session represents the execution context for a process
func NewSession ¶
NewSession creates a new session
func (*Session) ApplyParameters ¶
func (s *Session) ApplyParameters(params state.Parameters) error
ApplyParameters applies a list of parameters to the session
func (*Session) TaskSession ¶
type TaskState ¶
type TaskState string
TaskState represents the current State of a task
const ( TaskStatePending TaskState = "pending" TaskStateScheduled TaskState = "scheduled" TaskStateRunning TaskState = "running" TaskStateWaitForDependencies TaskState = "waitForDependencies" //waiting for dependency TaskStateWaitForSubTasks TaskState = "waitForSubTasks" //waiting for subtask // TaskStateWaitForApproval indicates the task is waiting for explicit // approval before it can be executed. Used by the optional policy/approval // mechanism. TaskStateWaitForApproval TaskState = "waitForApproval" TaskStateCompleted TaskState = "completed" TaskStateFailed TaskState = "failed" TaskStatePaused TaskState = "paused" TaskStateSkipped TaskState = "skipped" )