trace

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetExitCode added in v0.9.0

func GetExitCode(exec *WorkflowExecutionState) int

GetExitCode returns the exit code for a given workflow execution status.

func GetFoldStatus added in v0.9.0

func GetFoldStatus(c *cli.Context) ([]enums.WorkflowExecutionStatus, error)

Types

type ActivityExecutionState

type ActivityExecutionState struct {
	// ActivityId is the Activity's id, which will usually be the EventId of the Event it was scheduled with.
	ActivityId string
	// Status is the Execution's Status based on the last event that was processed.
	Status ActivityExecutionStatus
	// Type is the name/type of Activity.
	Type *common.ActivityType
	// Attempt contains the current Activity Execution's attempt.
	// Since Activities' events aren't reported until the Activity is closed, this will always be the last attempt.
	Attempt int32
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
}

ActivityExecutionState is a snapshot of the state of an Activity's Execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func NewActivityExecutionState

func NewActivityExecutionState() *ActivityExecutionState

func (*ActivityExecutionState) GetAttempt

func (state *ActivityExecutionState) GetAttempt() int32

func (*ActivityExecutionState) GetDuration

func (state *ActivityExecutionState) GetDuration() *time.Duration

func (*ActivityExecutionState) GetFailure

func (state *ActivityExecutionState) GetFailure() *failure.Failure

func (*ActivityExecutionState) GetName

func (state *ActivityExecutionState) GetName() string

func (*ActivityExecutionState) GetRetryState

func (state *ActivityExecutionState) GetRetryState() enums.RetryState

func (*ActivityExecutionState) GetStartTime

func (state *ActivityExecutionState) GetStartTime() *time.Time

func (*ActivityExecutionState) Update

func (state *ActivityExecutionState) Update(event *history.HistoryEvent)

Update updates the ActivityExecutionState with a HistoryEvent.

type ActivityExecutionStatus

type ActivityExecutionStatus int32

ActivityExecutionStatus is the Status of an ActivityExecution, analogous to enums.WorkflowExecutionStatus.

var (
	ACTIVITY_EXECUTION_STATUS_UNSPECIFIED      ActivityExecutionStatus = 0
	ACTIVITY_EXECUTION_STATUS_SCHEDULED        ActivityExecutionStatus = 1
	ACTIVITY_EXECUTION_STATUS_RUNNING          ActivityExecutionStatus = 2
	ACTIVITY_EXECUTION_STATUS_COMPLETED        ActivityExecutionStatus = 3
	ACTIVITY_EXECUTION_STATUS_FAILED           ActivityExecutionStatus = 4
	ACTIVITY_EXECUTION_STATUS_TIMED_OUT        ActivityExecutionStatus = 5
	ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ActivityExecutionStatus = 6
	ACTIVITY_EXECUTION_STATUS_CANCELED         ActivityExecutionStatus = 7
)

type ExecutionState

type ExecutionState interface {
	// Update updates an ExecutionState with a new HistoryEvent.
	Update(*history.HistoryEvent)
	// GetName returns the state's name (usually for displaying to the user).
	GetName() string
	// GetAttempt returns the attempts to execute the current ExecutionState.
	GetAttempt() int32
	// GetFailure returns the execution's failure (if any).
	GetFailure() *failure.Failure
	// GetRetryState returns the execution's RetryState.
	GetRetryState() enums.RetryState

	GetDuration() *time.Duration
	GetStartTime() *time.Time
}

ExecutionState provides a common interface to any execution (Workflows, Activities and Timers in this case) updated through HistoryEvents.

type TimerExecutionState

type TimerExecutionState struct {
	TimerId string
	// Name is the name of the Timer (if any has been given to it)
	Name string
	// StartToFireTimeout is the amount of time to elapse before the timer fires.
	StartToFireTimeout *time.Duration
	// Status is the Execution's Status based on the last event that was processed.
	Status TimerExecutionStatus
	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
}

TimerExecutionState contains information about a Timer as an execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func (*TimerExecutionState) GetAttempt

func (t *TimerExecutionState) GetAttempt() int32

func (*TimerExecutionState) GetDuration

func (t *TimerExecutionState) GetDuration() *time.Duration

func (*TimerExecutionState) GetFailure

func (t *TimerExecutionState) GetFailure() *failure.Failure

func (*TimerExecutionState) GetName

func (t *TimerExecutionState) GetName() string

func (*TimerExecutionState) GetRetryState

func (t *TimerExecutionState) GetRetryState() enums.RetryState

GetRetryState will always return RETRY_STATE_UNSPECIFIED since Timers don't retry.

func (*TimerExecutionState) GetStartTime

func (t *TimerExecutionState) GetStartTime() *time.Time

func (*TimerExecutionState) Update

func (t *TimerExecutionState) Update(event *history.HistoryEvent)

Update updates the TimerExecutionState with a HistoryEvent.

type TimerExecutionStatus

type TimerExecutionStatus int32

TimerExecutionStatus is the Status of a TimerExecution, analogous to enums.WorkflowExecutionStatus.

var (
	TIMER_STATUS_WAITING  TimerExecutionStatus = 0
	TIMER_STATUS_FIRED    TimerExecutionStatus = 1
	TIMER_STATUS_CANCELED TimerExecutionStatus = 2
)

type WorkflowExecutionState

type WorkflowExecutionState struct {
	// Execution is the workflow's execution (WorkflowId and RunId).
	Execution *common.WorkflowExecution
	// Type is the name/type of Workflow.
	Type *common.WorkflowType
	// StartTime is the time the Execution was started (based on the first Execution's Event).
	StartTime *time.Time
	// CloseTime is the time the Execution was closed (based on the first Execution's Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *time.Time
	// Status is the Execution's Status based on the last event that was processed.
	Status enums.WorkflowExecutionStatus
	// IsArchived will be true if the workflow has been archived.
	IsArchived bool

	// LastEventId is the EventId of the last processed HistoryEvent.
	LastEventId int64
	// HistoryLength is the number of HistoryEvents available in the server. It will zero for archived workflows and non-zero positive for any other workflow executions.
	HistoryLength int64

	// ChildStates contains all the ExecutionStates contained by this WorkflowExecutionState in order of execution.
	ChildStates []ExecutionState

	// Non-successful closed states
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// Termination contains the last available termination information that the Workflow Execution has reported (if any).
	Termination *history.WorkflowExecutionTerminatedEventAttributes
	// CancelRequest contains the last request that has been made to cancel the Workflow Execution (if any).
	CancelRequest *history.WorkflowExecutionCancelRequestedEventAttributes
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// Timeout and retry policies
	// WorkflowExecutionTimeout contains the Workflow Execution's timeout if it has been set.
	WorkflowExecutionTimeout *time.Duration
	// Attempt contains the current Workflow Execution's attempt.
	Attempt int32
	// MaximumAttempts contains the maximum number of times the Workflow Execution is allowed to retry before failing.
	MaximumAttempts int32

	// ParentWorkflowExecution identifies the parent Workflow and the execution run.
	ParentWorkflowExecution *common.WorkflowExecution
	// contains filtered or unexported fields
}

WorkflowExecutionState is a snapshot of the state of a WorkflowExecution. It is updated through HistoryEvents.

func NewWorkflowExecutionState

func NewWorkflowExecutionState(wfId, runId string) *WorkflowExecutionState

func (*WorkflowExecutionState) GetAttempt

func (state *WorkflowExecutionState) GetAttempt() int32

func (*WorkflowExecutionState) GetChildWorkflowByEventId added in v0.9.0

func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)

GetChildWorkflowByEventId returns a child workflow for a given initiated event id

func (*WorkflowExecutionState) GetDuration

func (state *WorkflowExecutionState) GetDuration() *time.Duration

func (*WorkflowExecutionState) GetFailure

func (state *WorkflowExecutionState) GetFailure() *failure.Failure

func (*WorkflowExecutionState) GetName

func (state *WorkflowExecutionState) GetName() string

func (*WorkflowExecutionState) GetNumberOfEvents

func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)

GetNumberOfEvents returns a count of the number of events processed and the total for a workflow execution. This method iteratively sums the LastEventId (the sequential id of the last event processed) and the HistoryLength for all child workflows

func (*WorkflowExecutionState) GetRetryState

func (state *WorkflowExecutionState) GetRetryState() enums.RetryState

func (*WorkflowExecutionState) GetStartTime

func (state *WorkflowExecutionState) GetStartTime() *time.Time

func (*WorkflowExecutionState) IsClosed added in v0.9.0

func (state *WorkflowExecutionState) IsClosed() (bool, error)

IsClosed returns true when the Workflow Execution is closed. A Closed status means that the Workflow Execution cannot make further progress.

func (*WorkflowExecutionState) Update

func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)

Update updates the WorkflowExecutionState and its child states with a HistoryEvent.

type WorkflowExecutionUpdate added in v0.9.0

type WorkflowExecutionUpdate struct {
	State *WorkflowExecutionState
}

func (*WorkflowExecutionUpdate) GetState added in v0.9.0

func (update *WorkflowExecutionUpdate) GetState() *WorkflowExecutionState

type WorkflowExecutionUpdateIterator added in v0.9.0

type WorkflowExecutionUpdateIterator interface {
	HasNext() bool
	Next() (*WorkflowExecutionUpdate, error)
}

WorkflowExecutionUpdateIterator is the interface the provides iterative updates, analogous to the HistoryEventIterator interface.

func GetWorkflowExecutionUpdates added in v0.9.0

func GetWorkflowExecutionUpdates(ctx context.Context, client sdkclient.Client, wfId, runId string, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, concurrency int) (WorkflowExecutionUpdateIterator, error)

GetWorkflowExecutionUpdates gets workflow execution updates for a particular workflow - workflow ID of the workflow - runID can be default (empty string) - depth of child workflows to request updates for (-1 for unlimited depth) - concurrency of requests (non-zero positive integer) Returns iterator (see client.GetWorkflowHistory) that provides updated WorkflowExecutionState snapshots. Example: To print a workflow's state whenever there's updates

iter := GetWorkflowExecutionUpdates(ctx, client, wfId, runId, -1, 5)
var state *WorkflowExecutionState
for iter.HasNext() {
	update = iter.Next()
	PrintWorkflowState(update.State)
}

type WorkflowExecutionUpdateIteratorImpl added in v0.9.0

type WorkflowExecutionUpdateIteratorImpl struct {
	// contains filtered or unexported fields
}

WorkflowExecutionUpdateIteratorImpl implements the iterator interface. Keeps information about the last processed update and receives new updates through the updateChan channel.

func (*WorkflowExecutionUpdateIteratorImpl) HasNext added in v0.9.0

func (iter *WorkflowExecutionUpdateIteratorImpl) HasNext() bool

HasNext checks if there's any more updates in the updateChan channel. Returns false if the channel has been closed.

func (*WorkflowExecutionUpdateIteratorImpl) Next added in v0.9.0

Next return the last processed execution update. HasNext has to be called first (following the HasNext/Next pattern).

type WorkflowStateJob added in v0.9.0

type WorkflowStateJob struct {
	// contains filtered or unexported fields
}

WorkflowStateJob implements a WorkerJob to retrieve updates for a WorkflowExecutionState and its child workflows.

func NewWorkflowStateJob added in v0.9.0

func NewWorkflowStateJob(ctx context.Context, client sdkclient.Client, state *WorkflowExecutionState, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, updateChan chan struct{}) (*WorkflowStateJob, error)

NewWorkflowStateJob returns a new WorkflowStateJob. It requires an updateChan to signal when there's updates.

func (*WorkflowStateJob) GetChildJob added in v0.9.0

func (job *WorkflowStateJob) GetChildJob(event *history.HistoryEvent) (*WorkflowStateJob, error)

GetChildJob gets a new child job and appends it to the list of childJobs. These jobs don't start until the parent is catched up.

func (*WorkflowStateJob) Run added in v0.9.0

func (job *WorkflowStateJob) Run(group *pond.TaskGroupWithContext) func() error

Run starts the WorkflowStateJob, which retrieves the workflow's events and spawns new jobs for the child workflows once it's up-to-date. New jobs are submitted to the pool when the job is up-to-date to reduce the amount of unnecessary history fetches (e.g. when the child workflow is already completed).

func (*WorkflowStateJob) ShouldStart added in v0.9.0

func (job *WorkflowStateJob) ShouldStart() bool

ShouldStart will return true if the state is in a status that requires requesting its event history. This will help reduce the amount of event histories requested when they're not needed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL