Documentation
¶
Index ¶
- Constants
- Variables
- func ContextValue[T any](ctx context.Context) T
- func KeyOf[T any]() reflect.Type
- type Context
- type Execution
- func (e *Execution) Clone() *Execution
- func (e *Execution) Complete()
- func (e *Execution) Context(eventType string, task *graph.Task) *event.Context
- func (e *Execution) Fail(err error)
- func (e *Execution) Merge(execution *Execution)
- func (e *Execution) Pause()
- func (e *Execution) Schedule()
- func (e *Execution) Skip()
- func (e *Execution) Start()
- type Option
- type Process
- func (p *Process) AddActiveTaskGroup(groupID string)
- func (p *Process) AllTasks() map[string]*graph.Task
- func (p *Process) CancelCtx()
- func (p *Process) Clone() *Process
- func (p *Process) CopyFrom(src any)
- func (p *Process) DecrementActiveTaskCount() int
- func (p *Process) GetActiveTaskCount() int
- func (p *Process) GetDep(e *Execution, taskID string) (TaskState, bool)
- func (p *Process) GetState() string
- func (p *Process) HasActiveTaskGroup(groupID string) bool
- func (p *Process) IncrementActiveTaskCount() int
- func (p *Process) LookupExecution(taskID string) *Execution
- func (p *Process) LookupTask(taskID string) *graph.Task
- func (p *Process) Peek() *Execution
- func (p *Process) Push(executions ...*Execution)
- func (p *Process) RegisterTask(t *graph.Task)
- func (p *Process) Remove(anExecution *Execution)
- func (p *Process) RemoveActiveTaskGroup(groupID string)
- func (p *Process) SetDep(e *Execution, taskID string, state TaskState)
- func (p *Process) SetState(state string)
- type ProcessOutput
- type Session
- func (s *Session) Append(key string, value interface{})
- func (s *Session) ApplyParameters(params state.Parameters) error
- func (s *Session) Clone() *Session
- func (s *Session) Expand(value interface{}) (interface{}, error)
- func (s *Session) FireWhen(expr string, result bool)
- func (s *Session) Get(key string) (interface{}, bool)
- func (s *Session) GetAll() map[string]interface{}
- func (s *Session) GetBool(key string) (bool, bool)
- func (s *Session) GetInt(key string) (int, bool)
- func (s *Session) GetString(key string) (string, bool)
- func (s *Session) RegisterListeners(fn ...StateListener)
- func (s *Session) RegisterWhenListeners(fn ...WhenListener)
- func (s *Session) Set(key string, value interface{})
- func (s *Session) TaskSession(from map[string]interface{}, options ...Option) *Session
- func (s *Session) TypedValue(aType reflect.Type, value interface{}) (interface{}, error)
- type StateListener
- type TaskState
- type Wait
- type WhenListener
Constants ¶
const ( StatePending = "pending" StateRunning = "running" StatePauseRequested = "pauseRequested" StatePaused = "paused" StateResumeRequested = "resumeRequested" StateCancelRequested = "cancelRequested" StateCancelled = "cancelled" StateCompleted = "completed" StateFailed = "failed" )
Process state constants
Variables ¶
var ContextKey = KeyOf[*Context]()
var EventKey = KeyOf[*event.Service]()
var ExecutionKey = KeyOf[*Execution]()
var ProcessKey = KeyOf[*Process]()
var TaskKey = KeyOf[*graph.Task]()
Functions ¶
func ContextValue ¶
ContextValue returns the value of the provided type from the context
Types ¶
type Context ¶
Context represents the execution context for a process
func NewContext ¶
type Execution ¶
type Execution struct {
ID string `json:"id"`
ProcessID string `json:"processId"`
ParentTaskID string `json:"parentTaskId,omitempty"`
GroupID string `json:"groupId,omitempty"`
TaskID string `json:"taskId"`
State TaskState `json:"state"`
Data map[string]interface{} `json:"data,omitempty"`
Input interface{} `json:"input,omitempty"`
Output interface{} `json:"empty,omitempty"`
Error string `json:"error,omitempty"`
Attempts int `json:"attempts,omitempty"`
ScheduledAt time.Time `json:"scheduledAt"`
StartedAt *time.Time `json:"startedAt,omitempty"`
PausedAt *time.Time `json:"exectedAt,omitempty"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
GoToTask string `json:"gotoTask,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
RunAfter *time.Time `json:"runAfter,omitempty"`
DependsOn []string `json:"dependencies"`
Dependencies map[string]TaskState `json:"completed,omitempty"`
Approved *bool `json:"approved,omitempty"`
ApprovalReason string `json:"approvedDecision,omitempty"` // "yes" or "no"
// contains filtered or unexported fields
}
Execution represents a single task execution
func NewExecution ¶
NewExecution creates a new execution for a task
func (*Execution) Clone ¶
Clone creates a deep copy of the execution so that the caller can mutate it without affecting the original instance. Only mutable collections are deep-copied; pointer fields referencing immutable data (Input / Output / Workflow structures) are left as-is.
func (*Execution) Complete ¶
func (e *Execution) Complete()
Complete marks the execution as completed
type Option ¶
type Option func(session *Session)
func WithConverter ¶
WithState sets the state for the session
func WithImports ¶
WithState sets the state for the session
func WithStateListeners ¶ added in v0.1.2
func WithStateListeners(listeners ...StateListener) Option
WithStateListeners attaches immutable listeners to the created session. The slice is copied; callers can reuse their backing array.
type Process ¶
type Process struct {
ID string `json:"id"`
ParentID string `json:"parentId,omitempty"`
SCN int `json:"scn"`
Name string `json:"name"`
State string `json:"state"`
Workflow *model.Workflow `json:"workflow"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
FinishedAt *time.Time `json:"finishedAt"`
Session *Session `json:"session"`
Stack []*Execution `json:"stack,omitempty"`
Errors map[string]string `json:"errors,omitempty"`
Span *tracing.Span `json:"-"`
// Cancel/Pause handling
CancelReason string `json:"cancelReason,omitempty"`
Ctx context.Context `json:"-"`
PausedAt *time.Time `json:"pausedAt,omitempty"`
ResumedAt *time.Time `json:"resumedAt,omitempty"`
Mode string `json:"mode"` //debug
// For serverless environments
ActiveTaskCount int `json:"activeTaskCount"`
ActiveTaskGroups map[string]bool `json:"activeTaskGroups"`
Policy *policy.Config `json:"policy,omitempty"`
// contains filtered or unexported fields
}
Process represents a workflow execution instance
func NewProcess ¶
func (*Process) AddActiveTaskGroup ¶
AddActiveTaskGroup marks a task group as active
func (*Process) CancelCtx ¶ added in v0.1.2
func (p *Process) CancelCtx()
CancelCtx cancels the internal context so that any in-flight task that is using it can abort early. Safe to call multiple times.
func (*Process) Clone ¶
Clone creates a deep copy of the Process suitable for safe concurrent reads/mutations outside the original store. The Workflow pointer is not cloned because workflows are immutable after initial load.
func (*Process) CopyFrom ¶ added in v0.1.2
CopyFrom updates exported, mutex-independent fields from src. It intentionally skips sync.Mutex as copying it would corrupt internal state.
func (*Process) DecrementActiveTaskCount ¶
DecrementActiveTaskCount decrements the active task counter
func (*Process) GetActiveTaskCount ¶
GetActiveTaskCount returns the current active task count
func (*Process) GetDep ¶ added in v0.1.2
getDep safely reads a dependency value; second return value indicates presence.
func (*Process) HasActiveTaskGroup ¶
HasActiveTaskGroup checks if a task group is active
func (*Process) IncrementActiveTaskCount ¶
IncrementActiveTaskCount increments the active task counter
func (*Process) LookupExecution ¶
func (*Process) RegisterTask ¶
RegisterTask adds a task (and its subtasks) to the process' task lookup map at runtime. It is primarily used for template expansions that create tasks dynamically after the workflow has started executing.
func (*Process) RemoveActiveTaskGroup ¶
RemoveActiveTaskGroup removes a task group from active groups
type ProcessOutput ¶
type Session ¶
type Session struct {
ID string
State map[string]interface{}
// contains filtered or unexported fields
}
Session represents the execution context for a process
func NewSession ¶
NewSession creates a new session
func (*Session) ApplyParameters ¶
func (s *Session) ApplyParameters(params state.Parameters) error
ApplyParameters applies a list of parameters to the session
func (*Session) FireWhen ¶ added in v0.1.2
FireWhen notifies all registered when-listeners. It is exported so that code outside the execution package (e.g. allocator) can emit the event.
func (*Session) RegisterListeners ¶ added in v0.1.2
func (s *Session) RegisterListeners(fn ...StateListener)
RegisterListeners attaches a callback that will be called on every Set. The call is made synchronously while the session mutex is held, therefore listeners MUST return quickly and must not call back into Session to avoid deadlocks.
func (*Session) RegisterWhenListeners ¶ added in v0.1.2
func (s *Session) RegisterWhenListeners(fn ...WhenListener)
RegisterWhenListeners attaches callbacks that are executed after every `when:` condition evaluation.
func (*Session) TaskSession ¶
type StateListener ¶ added in v0.1.2
StateListener is invoked every time Session.Set overwrites an existing key or inserts a new one.
type TaskState ¶
type TaskState string
TaskState represents the current State of a task
const ( TaskStatePending TaskState = "pending" TaskStateScheduled TaskState = "scheduled" TaskStateRunning TaskState = "running" TaskStateWaitForDependencies TaskState = "waitForDependencies" //waiting for dependency TaskStateWaitForSubTasks TaskState = "waitForSubTasks" //waiting for subtask // TaskStateWaitForApproval indicates the task is waiting for explicit // approval before it can be executed. Used by the optional policy/approval // mechanism. TaskStateWaitForApproval TaskState = "waitForApproval" TaskStateCompleted TaskState = "completed" TaskStateFailed TaskState = "failed" TaskStatePaused TaskState = "paused" TaskStateSkipped TaskState = "skipped" // TaskStateCancelled indicates the execution has been cancelled. TaskStateCancelled TaskState = "cancelled" )
func (TaskState) IsWaitForApproval ¶
type WhenListener ¶ added in v0.1.2
WhenListener is invoked every time a `when:` expression is evaluated. The listener receives the session (at evaluation time), the raw expression and the boolean outcome of the evaluation.