Documentation
¶
Index ¶
- Constants
- func GetDefaultHistoryVersion() int
- func GetMaxSupportedHistoryVersion() int
- func GetVisibilityTSFrom(task Task) time.Time
- func NewHistoryVersionCompatibilityError(required int, supported int) error
- func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
- func SetDefaultHistoryVersion(version int)
- func SetMaxSupportedHistoryVersion(version int)
- func SetVisibilityTSFrom(task Task, t time.Time)
- type ActivityInfo
- type ActivityTask
- type ActivityTimeoutTask
- type AppendHistoryEventsRequest
- type CancelExecutionTask
- type CassandraTestCluster
- type ChildExecutionInfo
- type Closeable
- type CompleteTaskRequest
- type CompleteTimerTaskRequest
- type CompleteTransferTaskRequest
- type ConditionFailedError
- type CreateDomainRequest
- type CreateDomainResponse
- type CreateShardRequest
- type CreateTaskInfo
- type CreateTasksRequest
- type CreateTasksResponse
- type CreateWorkflowExecutionRequest
- type CreateWorkflowExecutionResponse
- type DecisionTask
- type DecisionTimeoutTask
- type DeleteDomainByNameRequest
- type DeleteDomainRequest
- type DeleteExecutionTask
- type DeleteHistoryEventTask
- type DeleteWorkflowExecutionHistoryRequest
- type DeleteWorkflowExecutionRequest
- type DomainConfig
- type DomainInfo
- type ExecutionManager
- type ExecutionManagerFactory
- type GetClosedWorkflowExecutionRequest
- type GetClosedWorkflowExecutionResponse
- type GetCurrentExecutionRequest
- type GetCurrentExecutionResponse
- type GetDomainRequest
- type GetDomainResponse
- type GetShardRequest
- type GetShardResponse
- type GetTasksRequest
- type GetTasksResponse
- type GetTimerIndexTasksRequest
- type GetTimerIndexTasksResponse
- type GetTransferTasksRequest
- type GetTransferTasksResponse
- type GetWorkflowExecutionHistoryRequest
- type GetWorkflowExecutionHistoryResponse
- type GetWorkflowExecutionRequest
- type GetWorkflowExecutionResponse
- type HistoryDeserializationError
- type HistoryEventBatch
- type HistoryManager
- type HistorySerializationError
- type HistorySerializer
- type HistorySerializerFactory
- type HistoryVersionCompatibilityError
- type LeaseTaskListRequest
- type LeaseTaskListResponse
- type ListClosedWorkflowExecutionsByStatusRequest
- type ListWorkflowExecutionsByTypeRequest
- type ListWorkflowExecutionsByWorkflowIDRequest
- type ListWorkflowExecutionsRequest
- type ListWorkflowExecutionsResponse
- type MetadataManager
- type RecordWorkflowExecutionClosedRequest
- type RecordWorkflowExecutionStartedRequest
- type RequestCancelInfo
- type SerializedHistoryEventBatch
- type ShardAlreadyExistError
- type ShardInfo
- type ShardManager
- type ShardOwnershipLostError
- type StartChildExecutionTask
- type Task
- type TaskInfo
- type TaskListInfo
- type TaskManager
- type TestBase
- func (s *TestBase) ClearTransferQueue()
- func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
- func (s *TestBase) CompleteTimerTask(ts time.Time, taskID int64) error
- func (s *TestBase) CompleteTransferTask(taskID int64) error
- func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, ...) ([]int64, error)
- func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (int64, error)
- func (s *TestBase) CreateShard(shardID int, owner string, rangeID int64) error
- func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (string, error)
- func (s *TestBase) DeleteCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteCancelInfo int64) error
- func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
- func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
- func (s *TestBase) GetCurrentWorkflow(domainID, workflowID string) (string, error)
- func (s *TestBase) GetNextSequenceNumber() int64
- func (s *TestBase) GetReadLevel() int64
- func (s *TestBase) GetShard(shardID int) (*ShardInfo, error)
- func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
- func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error)
- func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
- func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) (*WorkflowMutableState, error)
- func (s *TestBase) SetupWorkflowStore()
- func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
- func (s *TestBase) TearDownWorkflowStore()
- func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) error
- func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
- func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- func (s *TestBase) UpsertRequestCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- type TestBaseOptions
- type TestShardContext
- func (s *TestShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error
- func (s *TestShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
- func (s *TestShardContext) GetExecutionManager() ExecutionManager
- func (s *TestShardContext) GetHistoryManager() HistoryManager
- func (s *TestShardContext) GetLogger() bark.Logger
- func (s *TestShardContext) GetMetricsClient() metrics.Client
- func (s *TestShardContext) GetNextTransferTaskID() (int64, error)
- func (s *TestShardContext) GetRangeID() int64
- func (s *TestShardContext) GetTimeSource() common.TimeSource
- func (s *TestShardContext) GetTimerAckLevel() time.Time
- func (s *TestShardContext) GetTransferAckLevel() int64
- func (s *TestShardContext) GetTransferMaxReadLevel() int64
- func (s *TestShardContext) GetTransferSequenceNumber() int64
- func (s *TestShardContext) Reset()
- func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error
- func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error
- func (s *TestShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
- type TimeoutError
- type TimerInfo
- type TimerTaskInfo
- type TransferTaskInfo
- type UnknownEncodingTypeError
- type UpdateDomainRequest
- type UpdateShardRequest
- type UpdateTaskListRequest
- type UpdateTaskListResponse
- type UpdateWorkflowExecutionRequest
- type UserTimerTask
- type VisibilityManager
- type WorkflowExecutionInfo
- type WorkflowMutableState
- type WorkflowTimeoutTask
Constants ¶
const ( DomainStatusRegistered = iota DomainStatusDeprecated DomainStatusDeleted )
Domain status
const ( WorkflowStateCreated = iota WorkflowStateRunning WorkflowStateCompleted )
Workflow execution states
const ( WorkflowCloseStatusNone = iota WorkflowCloseStatusCompleted WorkflowCloseStatusFailed WorkflowCloseStatusCanceled WorkflowCloseStatusTerminated WorkflowCloseStatusContinuedAsNew WorkflowCloseStatusTimedOut )
Workflow execution close status
const ( TaskListTypeDecision = iota TaskListTypeActivity )
Types of task lists
const ( TransferTaskTypeDecisionTask = iota TransferTaskTypeActivityTask TransferTaskTypeDeleteExecution TransferTaskTypeCancelExecution TransferTaskTypeStartChildExecution )
Transfer task types
const ( TaskTypeDecisionTimeout = iota TaskTypeActivityTimeout TaskTypeUserTimer TaskTypeWorkflowTimeout TaskTypeDeleteHistoryEvent )
Types of timers
const ( // DefaultEncodingType is the default encoding format for persisted history DefaultEncodingType = common.EncodingTypeJSON )
Variables ¶
This section is empty.
Functions ¶
func GetDefaultHistoryVersion ¶
func GetDefaultHistoryVersion() int
GetDefaultHistoryVersion returns the default history version
func GetMaxSupportedHistoryVersion ¶
func GetMaxSupportedHistoryVersion() int
GetMaxSupportedHistoryVersion returns the max supported version
func GetVisibilityTSFrom ¶
GetVisibilityTSFrom - helper method to get visibility timestamp
func NewHistoryVersionCompatibilityError ¶
NewHistoryVersionCompatibilityError returns a new instance of compatibility error type
func NewUnknownEncodingTypeError ¶
func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
NewUnknownEncodingTypeError returns a new instance of encoding type error
func SetDefaultHistoryVersion ¶
func SetDefaultHistoryVersion(version int)
SetDefaultHistoryVersion resets the default history version only intended for integration test
func SetMaxSupportedHistoryVersion ¶
func SetMaxSupportedHistoryVersion(version int)
SetMaxSupportedHistoryVersion resets the max supported history version this method is only intended for integration test
func SetVisibilityTSFrom ¶
SetVisibilityTSFrom - helper method to set visibility timestamp
Types ¶
type ActivityInfo ¶
type ActivityInfo struct {
ScheduleID int64
ScheduledEvent []byte
StartedID int64
StartedEvent []byte
ActivityID string
RequestID string
Details []byte
ScheduleToStartTimeout int32
ScheduleToCloseTimeout int32
StartToCloseTimeout int32
HeartbeatTimeout int32
CancelRequested bool
CancelRequestID int64
LastHeartBeatUpdatedTime time.Time
}
ActivityInfo details.
type ActivityTask ¶
ActivityTask identifies a transfer task for activity
func (*ActivityTask) GetTaskID ¶
func (a *ActivityTask) GetTaskID() int64
GetTaskID returns the sequence ID of the activity task
func (*ActivityTask) GetType ¶
func (a *ActivityTask) GetType() int
GetType returns the type of the activity task
func (*ActivityTask) SetTaskID ¶
func (a *ActivityTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the activity task
type ActivityTimeoutTask ¶
type ActivityTimeoutTask struct {
VisibilityTimestamp time.Time
TaskID int64
TimeoutType int
EventID int64
}
ActivityTimeoutTask identifies a timeout task.
func (*ActivityTimeoutTask) GetTaskID ¶
func (a *ActivityTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*ActivityTimeoutTask) GetType ¶
func (a *ActivityTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*ActivityTimeoutTask) GetVisibilityTimestamp ¶
func (a *ActivityTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*ActivityTimeoutTask) SetTaskID ¶
func (a *ActivityTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
func (*ActivityTimeoutTask) SetVisibilityTimestamp ¶
func (a *ActivityTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type AppendHistoryEventsRequest ¶
type AppendHistoryEventsRequest struct {
DomainID string
Execution workflow.WorkflowExecution
FirstEventID int64
RangeID int64
TransactionID int64
Events *SerializedHistoryEventBatch
Overwrite bool
}
AppendHistoryEventsRequest is used to append new events to workflow execution history
type CancelExecutionTask ¶
type CancelExecutionTask struct {
TaskID int64
TargetDomainID string
TargetWorkflowID string
TargetRunID string
ScheduleID int64
}
CancelExecutionTask identifies a transfer task for cancel of execution
func (*CancelExecutionTask) GetTaskID ¶
func (u *CancelExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*CancelExecutionTask) GetType ¶
func (u *CancelExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*CancelExecutionTask) SetTaskID ¶
func (u *CancelExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type CassandraTestCluster ¶
type CassandraTestCluster struct {
// contains filtered or unexported fields
}
CassandraTestCluster allows executing cassandra operations in testing.
type ChildExecutionInfo ¶
type ChildExecutionInfo struct {
InitiatedID int64
InitiatedEvent []byte
StartedID int64
StartedEvent []byte
CreateRequestID string
}
ChildExecutionInfo has details for pending child executions.
type Closeable ¶
type Closeable interface {
Close()
}
Closeable is an interface for any entity that supports a close operation to release resources
type CompleteTaskRequest ¶
type CompleteTaskRequest struct {
TaskList *TaskListInfo
TaskID int64
}
CompleteTaskRequest is used to complete a task
type CompleteTimerTaskRequest ¶
CompleteTimerTaskRequest is used to complete a task in the timer task queue
type CompleteTransferTaskRequest ¶
type CompleteTransferTaskRequest struct {
TaskID int64
}
CompleteTransferTaskRequest is used to complete a task in the transfer task queue
type ConditionFailedError ¶
type ConditionFailedError struct {
Msg string
}
ConditionFailedError represents a failed conditional put
func (*ConditionFailedError) Error ¶
func (e *ConditionFailedError) Error() string
type CreateDomainRequest ¶
type CreateDomainRequest struct {
Name string
Status int
Description string
OwnerEmail string
Retention int32
EmitMetric bool
}
CreateDomainRequest is used to create the domain
type CreateDomainResponse ¶
type CreateDomainResponse struct {
ID string
}
CreateDomainResponse is the response for CreateDomain
type CreateShardRequest ¶
type CreateShardRequest struct {
ShardInfo *ShardInfo
}
CreateShardRequest is used to create a shard in executions table
type CreateTaskInfo ¶
type CreateTaskInfo struct {
Execution workflow.WorkflowExecution
Data *TaskInfo
TaskID int64
}
CreateTaskInfo describes a task to be created in CreateTasksRequest
type CreateTasksRequest ¶
type CreateTasksRequest struct {
DomainID string
TaskList string
TaskListType int
RangeID int64
Tasks []*CreateTaskInfo
}
CreateTasksRequest is used to create a new task for a workflow exectution
type CreateTasksResponse ¶
type CreateTasksResponse struct {
}
CreateTasksResponse is the response to CreateTasksRequest
type CreateWorkflowExecutionRequest ¶
type CreateWorkflowExecutionRequest struct {
RequestID string
DomainID string
Execution workflow.WorkflowExecution
ParentDomainID string
ParentExecution *workflow.WorkflowExecution
InitiatedID int64
TaskList string
WorkflowTypeName string
DecisionTimeoutValue int32
ExecutionContext []byte
NextEventID int64
LastProcessedEvent int64
TransferTasks []Task
TimerTasks []Task
RangeID int64
DecisionScheduleID int64
DecisionStartedID int64
DecisionStartToCloseTimeout int32
ContinueAsNew bool
}
CreateWorkflowExecutionRequest is used to write a new workflow execution
type CreateWorkflowExecutionResponse ¶
type CreateWorkflowExecutionResponse struct {
TaskID string
}
CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
type DecisionTask ¶
DecisionTask identifies a transfer task for decision
func (*DecisionTask) GetTaskID ¶
func (d *DecisionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the decision task.
func (*DecisionTask) GetType ¶
func (d *DecisionTask) GetType() int
GetType returns the type of the decision task
func (*DecisionTask) SetTaskID ¶
func (d *DecisionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the decision task
type DecisionTimeoutTask ¶
DecisionTimeoutTask identifies a timeout task.
func (*DecisionTimeoutTask) GetTaskID ¶
func (d *DecisionTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*DecisionTimeoutTask) GetType ¶
func (d *DecisionTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*DecisionTimeoutTask) GetVisibilityTimestamp ¶
func (d *DecisionTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*DecisionTimeoutTask) SetTaskID ¶
func (d *DecisionTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
func (*DecisionTimeoutTask) SetVisibilityTimestamp ¶
func (d *DecisionTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type DeleteDomainByNameRequest ¶
type DeleteDomainByNameRequest struct {
Name string
}
DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table
type DeleteDomainRequest ¶
type DeleteDomainRequest struct {
ID string
}
DeleteDomainRequest is used to delete domain entry from domains table
type DeleteExecutionTask ¶
type DeleteExecutionTask struct {
TaskID int64
}
DeleteExecutionTask identifies a transfer task for deletion of execution
func (*DeleteExecutionTask) GetTaskID ¶
func (a *DeleteExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the delete execution task
func (*DeleteExecutionTask) GetType ¶
func (a *DeleteExecutionTask) GetType() int
GetType returns the type of the delete execution task
func (*DeleteExecutionTask) SetTaskID ¶
func (a *DeleteExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the delete execution task
type DeleteHistoryEventTask ¶
DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.
func (*DeleteHistoryEventTask) GetTaskID ¶
func (a *DeleteHistoryEventTask) GetTaskID() int64
GetTaskID returns the sequence ID of the delete execution task
func (*DeleteHistoryEventTask) GetType ¶
func (a *DeleteHistoryEventTask) GetType() int
GetType returns the type of the delete execution task
func (*DeleteHistoryEventTask) SetTaskID ¶
func (a *DeleteHistoryEventTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the delete execution task
type DeleteWorkflowExecutionHistoryRequest ¶
type DeleteWorkflowExecutionHistoryRequest struct {
DomainID string
Execution workflow.WorkflowExecution
}
DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history
type DeleteWorkflowExecutionRequest ¶
type DeleteWorkflowExecutionRequest struct {
ExecutionInfo *WorkflowExecutionInfo
}
DeleteWorkflowExecutionRequest is used to delete a workflow execution
type DomainConfig ¶
DomainConfig describes the domain configuration
type DomainInfo ¶
DomainInfo describes the domain entity
type ExecutionManager ¶
type ExecutionManager interface {
Closeable
CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
CompleteTransferTask(request *CompleteTransferTaskRequest) error
// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
}
ExecutionManager is used to manage workflow executions
func NewCassandraWorkflowExecutionPersistence ¶
func NewCassandraWorkflowExecutionPersistence(hosts string, dc string, keyspace string, shardID int, logger bark.Logger) (ExecutionManager, error)
NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
func NewWorkflowExecutionPersistenceClient ¶
func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client) ExecutionManager
NewWorkflowExecutionPersistenceClient creates a client to manage executions
type ExecutionManagerFactory ¶
type ExecutionManagerFactory interface {
CreateExecutionManager(shardID int) (ExecutionManager, error)
}
ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
type GetClosedWorkflowExecutionRequest ¶
type GetClosedWorkflowExecutionRequest struct {
DomainUUID string
Execution s.WorkflowExecution
}
GetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
type GetClosedWorkflowExecutionResponse ¶
type GetClosedWorkflowExecutionResponse struct {
Execution *s.WorkflowExecutionInfo
}
GetClosedWorkflowExecutionResponse is the response to GetClosedWorkflowExecutionRequest
type GetCurrentExecutionRequest ¶
GetCurrentExecutionRequest is used to retrieve the current RunId for an execution
type GetCurrentExecutionResponse ¶
type GetCurrentExecutionResponse struct {
RunID string
}
GetCurrentExecutionResponse is the response to GetCurrentExecution
type GetDomainRequest ¶
GetDomainRequest is used to read domain
type GetDomainResponse ¶
type GetDomainResponse struct {
Info *DomainInfo
Config *DomainConfig
}
GetDomainResponse is the response for GetDomain
type GetShardRequest ¶
type GetShardRequest struct {
ShardID int
}
GetShardRequest is used to get shard information
type GetShardResponse ¶
type GetShardResponse struct {
ShardInfo *ShardInfo
}
GetShardResponse is the response to GetShard
type GetTasksRequest ¶
type GetTasksRequest struct {
DomainID string
TaskList string
TaskType int
ReadLevel int64
MaxReadLevel int64 // inclusive
BatchSize int
RangeID int64
}
GetTasksRequest is used to retrieve tasks of a task list
type GetTasksResponse ¶
type GetTasksResponse struct {
Tasks []*TaskInfo
}
GetTasksResponse is the response to GetTasksRequests
type GetTimerIndexTasksRequest ¶
type GetTimerIndexTasksRequest struct {
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
}
GetTimerIndexTasksRequest is the request for GetTimerIndexTasks TODO: replace this with an iterator that can configure min and max index.
type GetTimerIndexTasksResponse ¶
type GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
}
GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
type GetTransferTasksRequest ¶
GetTransferTasksRequest is used to read tasks from the transfer task queue
type GetTransferTasksResponse ¶
type GetTransferTasksResponse struct {
Tasks []*TransferTaskInfo
}
GetTransferTasksResponse is the response to GetTransferTasksRequest
type GetWorkflowExecutionHistoryRequest ¶
type GetWorkflowExecutionHistoryRequest struct {
DomainID string
Execution workflow.WorkflowExecution
// Get the history events upto NextEventID. Not Inclusive.
NextEventID int64
// Maximum number of history append transactions per page
PageSize int
// Token to continue reading next page of history append transactions. Pass in empty slice for first page
NextPageToken []byte
}
GetWorkflowExecutionHistoryRequest is used to retrieve history of a workflow execution
type GetWorkflowExecutionHistoryResponse ¶
type GetWorkflowExecutionHistoryResponse struct {
// Slice of history append transaction batches
Events []SerializedHistoryEventBatch
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on GetworkflowExecutionHistoryRequest to read the next page.
NextPageToken []byte
}
GetWorkflowExecutionHistoryResponse is the response to GetWorkflowExecutionHistoryRequest
type GetWorkflowExecutionRequest ¶
type GetWorkflowExecutionRequest struct {
DomainID string
Execution workflow.WorkflowExecution
}
GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
type GetWorkflowExecutionResponse ¶
type GetWorkflowExecutionResponse struct {
State *WorkflowMutableState
}
GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest
type HistoryDeserializationError ¶
type HistoryDeserializationError struct {
// contains filtered or unexported fields
}
HistoryDeserializationError is an error type that's returned on a history deserialization failure
func (*HistoryDeserializationError) Error ¶
func (e *HistoryDeserializationError) Error() string
type HistoryEventBatch ¶
type HistoryEventBatch struct {
Version int
Events []*workflow.HistoryEvent
}
HistoryEventBatch represents a batch of history events
func NewHistoryEventBatch ¶
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch
NewHistoryEventBatch returns a new instance of HistoryEventBatch
func (*HistoryEventBatch) String ¶
func (b *HistoryEventBatch) String() string
type HistoryManager ¶
type HistoryManager interface {
Closeable
AppendHistoryEvents(request *AppendHistoryEventsRequest) error
// GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution
GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse,
error)
DeleteWorkflowExecutionHistory(request *DeleteWorkflowExecutionHistoryRequest) error
}
HistoryManager is used to manage Workflow Execution HistoryEventBatch
func NewCassandraHistoryPersistence ¶
func NewCassandraHistoryPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (HistoryManager, error)
NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation
func NewHistoryPersistenceClient ¶
func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client) HistoryManager
NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history
type HistorySerializationError ¶
type HistorySerializationError struct {
// contains filtered or unexported fields
}
HistorySerializationError is an error type that's returned on a history serialization failure
func (*HistorySerializationError) Error ¶
func (e *HistorySerializationError) Error() string
type HistorySerializer ¶
type HistorySerializer interface {
Serialize(batch *HistoryEventBatch) (*SerializedHistoryEventBatch, error)
Deserialize(batch *SerializedHistoryEventBatch) (*HistoryEventBatch, error)
}
HistorySerializer is used to serialize/deserialize history
func NewJSONHistorySerializer ¶
func NewJSONHistorySerializer() HistorySerializer
NewJSONHistorySerializer returns a JSON HistorySerializer
type HistorySerializerFactory ¶
type HistorySerializerFactory interface {
// Get returns a history serializer corresponding
// to a given encoding type
Get(encodingType common.EncodingType) (HistorySerializer, error)
}
HistorySerializerFactory is a factory that vends HistorySerializers based on encoding type.
func NewHistorySerializerFactory ¶
func NewHistorySerializerFactory() HistorySerializerFactory
NewHistorySerializerFactory creates and returns an instance of HistorySerializerFactory
type HistoryVersionCompatibilityError ¶
type HistoryVersionCompatibilityError struct {
// contains filtered or unexported fields
}
HistoryVersionCompatibilityError is an error type that's returned when history serialization or deserialization cannot proceed due to version incompatibility
func (*HistoryVersionCompatibilityError) Error ¶
func (e *HistoryVersionCompatibilityError) Error() string
type LeaseTaskListRequest ¶
LeaseTaskListRequest is used to request lease of a task list
type LeaseTaskListResponse ¶
type LeaseTaskListResponse struct {
TaskListInfo *TaskListInfo
}
LeaseTaskListResponse is response to LeaseTaskListRequest
type ListClosedWorkflowExecutionsByStatusRequest ¶
type ListClosedWorkflowExecutionsByStatusRequest struct {
ListWorkflowExecutionsRequest
Status s.WorkflowExecutionCloseStatus
}
ListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status
type ListWorkflowExecutionsByTypeRequest ¶
type ListWorkflowExecutionsByTypeRequest struct {
ListWorkflowExecutionsRequest
WorkflowTypeName string
}
ListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain
type ListWorkflowExecutionsByWorkflowIDRequest ¶
type ListWorkflowExecutionsByWorkflowIDRequest struct {
ListWorkflowExecutionsRequest
WorkflowID string
}
ListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain
type ListWorkflowExecutionsRequest ¶
type ListWorkflowExecutionsRequest struct {
DomainUUID string
EarliestStartTime int64
LatestStartTime int64
// Maximum number of workflow executions per page
PageSize int
// Token to continue reading next page of workflow executions.
// Pass in empty slice for first page.
NextPageToken []byte
}
ListWorkflowExecutionsRequest is used to list executions in a domain
type ListWorkflowExecutionsResponse ¶
type ListWorkflowExecutionsResponse struct {
Executions []*s.WorkflowExecutionInfo
// Token to read next page if there are more workflow executions beyond page size.
// Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page.
NextPageToken []byte
}
ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest
type MetadataManager ¶
type MetadataManager interface {
Closeable
CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error)
GetDomain(request *GetDomainRequest) (*GetDomainResponse, error)
UpdateDomain(request *UpdateDomainRequest) error
DeleteDomain(request *DeleteDomainRequest) error
DeleteDomainByName(request *DeleteDomainByNameRequest) error
}
MetadataManager is used to manage metadata CRUD for various entities
func NewCassandraMetadataPersistence ¶
func NewCassandraMetadataPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (MetadataManager, error)
NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation
func NewMetadataPersistenceClient ¶
func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client) MetadataManager
NewMetadataPersistenceClient creates a HistoryManager client to manage workflow execution history
type RecordWorkflowExecutionClosedRequest ¶
type RecordWorkflowExecutionClosedRequest struct {
DomainUUID string
Execution s.WorkflowExecution
WorkflowTypeName string
StartTimestamp int64
CloseTimestamp int64
Status s.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionSeconds int64
}
RecordWorkflowExecutionClosedRequest is used to add a record of a newly closed execution
type RecordWorkflowExecutionStartedRequest ¶
type RecordWorkflowExecutionStartedRequest struct {
DomainUUID string
Execution s.WorkflowExecution
WorkflowTypeName string
StartTimestamp int64
}
RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution
type RequestCancelInfo ¶
RequestCancelInfo has details for pending external workflow cancellations
type SerializedHistoryEventBatch ¶
type SerializedHistoryEventBatch struct {
EncodingType common.EncodingType
Version int
Data []byte
}
SerializedHistoryEventBatch represents a serialized batch of history events
func NewSerializedHistoryEventBatch ¶
func NewSerializedHistoryEventBatch(data []byte, encoding common.EncodingType, version int) *SerializedHistoryEventBatch
NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch
func (*SerializedHistoryEventBatch) String ¶
func (h *SerializedHistoryEventBatch) String() string
type ShardAlreadyExistError ¶
type ShardAlreadyExistError struct {
Msg string
}
ShardAlreadyExistError is returned when conditionally creating a shard fails
func (*ShardAlreadyExistError) Error ¶
func (e *ShardAlreadyExistError) Error() string
type ShardInfo ¶
type ShardInfo struct {
ShardID int
Owner string
RangeID int64
StolenSinceRenew int
UpdatedAt time.Time
TransferAckLevel int64
TimerAckLevel time.Time
}
ShardInfo describes a shard
type ShardManager ¶
type ShardManager interface {
Closeable
CreateShard(request *CreateShardRequest) error
GetShard(request *GetShardRequest) (*GetShardResponse, error)
UpdateShard(request *UpdateShardRequest) error
}
ShardManager is used to manage all shards
func NewCassandraShardPersistence ¶
func NewCassandraShardPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (ShardManager, error)
NewCassandraShardPersistence is used to create an instance of ShardManager implementation
func NewShardPersistenceClient ¶
func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client) ShardManager
NewShardPersistenceClient creates a client to manage shards
type ShardOwnershipLostError ¶
ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard
func (*ShardOwnershipLostError) Error ¶
func (e *ShardOwnershipLostError) Error() string
type StartChildExecutionTask ¶
type StartChildExecutionTask struct {
TaskID int64
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
}
StartChildExecutionTask identifies a transfer task for starting child execution
func (*StartChildExecutionTask) GetTaskID ¶
func (u *StartChildExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*StartChildExecutionTask) GetType ¶
func (u *StartChildExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*StartChildExecutionTask) SetTaskID ¶
func (u *StartChildExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type TaskInfo ¶
type TaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
ScheduleID int64
ScheduleToStartTimeout int32
}
TaskInfo describes either activity or decision task
type TaskListInfo ¶
TaskListInfo describes a state of a task list implementation.
type TaskManager ¶
type TaskManager interface {
Closeable
LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error)
GetTasks(request *GetTasksRequest) (*GetTasksResponse, error)
CompleteTask(request *CompleteTaskRequest) error
}
TaskManager is used to manage tasks
func NewCassandraTaskPersistence ¶
func NewCassandraTaskPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (TaskManager, error)
NewCassandraTaskPersistence is used to create an instance of TaskManager implementation
func NewTaskPersistenceClient ¶
func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client) TaskManager
NewTaskPersistenceClient creates a client to manage tasks
type TestBase ¶
type TestBase struct {
ShardMgr ShardManager
ExecutionMgrFactory ExecutionManagerFactory
WorkflowMgr ExecutionManager
TaskMgr TaskManager
HistoryMgr HistoryManager
MetadataManager MetadataManager
VisibilityMgr VisibilityManager
ShardInfo *ShardInfo
ShardContext *TestShardContext
CassandraTestCluster
// contains filtered or unexported fields
}
TestBase wraps the base setup needed to create workflows over engine layer.
func (*TestBase) ClearTransferQueue ¶
func (s *TestBase) ClearTransferQueue()
ClearTransferQueue completes all tasks in transfer queue
func (*TestBase) CompleteTask ¶
func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
CompleteTask is a utility method to complete a task
func (*TestBase) CompleteTimerTask ¶
CompleteTimerTask is a utility method to complete a timer task
func (*TestBase) CompleteTransferTask ¶
CompleteTransferTask is a utility method to complete a transfer task
func (*TestBase) ContinueAsNewExecution ¶
func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64) error
ContinueAsNewExecution is a utility method to create workflow executions
func (*TestBase) CreateActivityTasks ¶
func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, activities map[int64]string) ([]int64, error)
CreateActivityTasks is a utility method to create tasks
func (*TestBase) CreateChildWorkflowExecution ¶
func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateChildWorkflowExecution is a utility method to create child workflow executions
func (*TestBase) CreateDecisionTask ¶
func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, decisionScheduleID int64) (int64, error)
CreateDecisionTask is a utility method to create a task
func (*TestBase) CreateShard ¶
CreateShard is a utility method to create the shard using persistence layer
func (*TestBase) CreateWorkflowExecution ¶
func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList, wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateWorkflowExecution is a utility method to create workflow executions
func (*TestBase) CreateWorkflowExecutionManyTasks ¶
func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error)
CreateWorkflowExecutionManyTasks is a utility method to create workflow executions
func (*TestBase) DeleteCancelState ¶
func (s *TestBase) DeleteCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteCancelInfo int64) error
DeleteCancelState is a utility method to delete request cancel state from mutable state
func (*TestBase) DeleteChildExecutionsState ¶
func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
DeleteChildExecutionsState is a utility method to delete child execution from mutable state
func (*TestBase) DeleteWorkflowExecution ¶
func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
DeleteWorkflowExecution is a utility method to delete a workflow execution
func (*TestBase) GetCurrentWorkflow ¶
GetCurrentWorkflow returns the workflow state for the given params
func (*TestBase) GetNextSequenceNumber ¶
GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
func (*TestBase) GetReadLevel ¶
GetReadLevel returns the current read level for shard
func (*TestBase) GetTasks ¶
func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
GetTasks is a utility method to get tasks from persistence
func (*TestBase) GetTimerIndexTasks ¶
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error)
GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetTransferTasks ¶
func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
GetTransferTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetWorkflowExecutionInfo ¶
func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) ( *WorkflowMutableState, error)
GetWorkflowExecutionInfo is a utility method to retrieve execution info
func (*TestBase) SetupWorkflowStore ¶
func (s *TestBase) SetupWorkflowStore()
SetupWorkflowStore to setup workflow test base
func (*TestBase) SetupWorkflowStoreWithOptions ¶
func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
SetupWorkflowStoreWithOptions to setup workflow test base
func (*TestBase) TearDownWorkflowStore ¶
func (s *TestBase) TearDownWorkflowStore()
TearDownWorkflowStore to cleanup
func (*TestBase) UpdateShard ¶
UpdateShard is a utility method to update the shard using persistence layer
func (*TestBase) UpdateWorkflowExecution ¶
func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error
UpdateWorkflowExecution is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionAndDelete ¶
func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
UpdateWorkflowExecutionAndDelete is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionForRequestCancel ¶
func (s *TestBase) UpdateWorkflowExecutionForRequestCancel( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertRequestCancelInfo []*RequestCancelInfo) error
UpdateWorkflowExecutionForRequestCancel is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithRangeID ¶
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64) error
UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithTransferTasks ¶
func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertActivityInfo []*ActivityInfo) error
UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution
func (*TestBase) UpsertChildExecutionsState ¶
func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, upsertChildInfos []*ChildExecutionInfo) error
UpsertChildExecutionsState is a utility method to update mutable state of workflow execution
func (*TestBase) UpsertRequestCancelState ¶
func (s *TestBase) UpsertRequestCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, upsertCancelInfos []*RequestCancelInfo) error
UpsertRequestCancelState is a utility method to update mutable state of workflow execution
type TestBaseOptions ¶
type TestBaseOptions struct {
ClusterHost string
KeySpace string
Datacenter string
DropKeySpace bool
SchemaDir string
}
TestBaseOptions options to configure workflow test base.
type TestShardContext ¶
TestShardContext shard context for testing. TODO: Cleanup, move this out of persistence
func (*TestShardContext) AppendHistoryEvents ¶
func (s *TestShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error
AppendHistoryEvents test implementation
func (*TestShardContext) CreateWorkflowExecution ¶
func (s *TestShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) ( *CreateWorkflowExecutionResponse, error)
CreateWorkflowExecution test implementation
func (*TestShardContext) GetExecutionManager ¶
func (s *TestShardContext) GetExecutionManager() ExecutionManager
GetExecutionManager test implementation
func (*TestShardContext) GetHistoryManager ¶
func (s *TestShardContext) GetHistoryManager() HistoryManager
GetHistoryManager test implementation
func (*TestShardContext) GetLogger ¶
func (s *TestShardContext) GetLogger() bark.Logger
GetLogger test implementation
func (*TestShardContext) GetMetricsClient ¶
func (s *TestShardContext) GetMetricsClient() metrics.Client
GetMetricsClient test implementation
func (*TestShardContext) GetNextTransferTaskID ¶
func (s *TestShardContext) GetNextTransferTaskID() (int64, error)
GetNextTransferTaskID test implementation
func (*TestShardContext) GetRangeID ¶
func (s *TestShardContext) GetRangeID() int64
GetRangeID test implementation
func (*TestShardContext) GetTimeSource ¶
func (s *TestShardContext) GetTimeSource() common.TimeSource
GetTimeSource test implementation
func (*TestShardContext) GetTimerAckLevel ¶
func (s *TestShardContext) GetTimerAckLevel() time.Time
GetTimerAckLevel test implementation
func (*TestShardContext) GetTransferAckLevel ¶
func (s *TestShardContext) GetTransferAckLevel() int64
GetTransferAckLevel test implementation
func (*TestShardContext) GetTransferMaxReadLevel ¶
func (s *TestShardContext) GetTransferMaxReadLevel() int64
GetTransferMaxReadLevel test implementation
func (*TestShardContext) GetTransferSequenceNumber ¶
func (s *TestShardContext) GetTransferSequenceNumber() int64
GetTransferSequenceNumber test implementation
func (*TestShardContext) UpdateTimerAckLevel ¶
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error
UpdateTimerAckLevel test implementation
func (*TestShardContext) UpdateTransferAckLevel ¶
func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error
UpdateTransferAckLevel test implementation
func (*TestShardContext) UpdateWorkflowExecution ¶
func (s *TestShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
UpdateWorkflowExecution test implementation
type TimeoutError ¶
type TimeoutError struct {
Msg string
}
TimeoutError is returned when a write operation fails due to a timeout
func (*TimeoutError) Error ¶
func (e *TimeoutError) Error() string
type TimerTaskInfo ¶
type TimerTaskInfo struct {
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TaskType int
TimeoutType int
EventID int64
}
TimerTaskInfo describes a timer task.
type TransferTaskInfo ¶
type TransferTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TargetDomainID string
TargetWorkflowID string
TargetRunID string
TaskList string
TaskType int
ScheduleID int64
}
TransferTaskInfo describes a transfer task
type UnknownEncodingTypeError ¶
type UnknownEncodingTypeError struct {
// contains filtered or unexported fields
}
UnknownEncodingTypeError is an error type that's returned when the encoding type provided as input is unknown or unsupported
func (*UnknownEncodingTypeError) Error ¶
func (e *UnknownEncodingTypeError) Error() string
type UpdateDomainRequest ¶
type UpdateDomainRequest struct {
Info *DomainInfo
Config *DomainConfig
}
UpdateDomainRequest is used to update domain
type UpdateShardRequest ¶
UpdateShardRequest is used to update shard information
type UpdateTaskListRequest ¶
type UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
}
UpdateTaskListRequest is used to update task list implementation information
type UpdateTaskListResponse ¶
type UpdateTaskListResponse struct {
}
UpdateTaskListResponse is the response to UpdateTaskList
type UpdateWorkflowExecutionRequest ¶
type UpdateWorkflowExecutionRequest struct {
ExecutionInfo *WorkflowExecutionInfo
TransferTasks []Task
TimerTasks []Task
DeleteTimerTask Task
Condition int64
RangeID int64
ContinueAsNew *CreateWorkflowExecutionRequest
CloseExecution bool
// Mutable state
UpsertActivityInfos []*ActivityInfo
DeleteActivityInfo *int64
UpserTimerInfos []*TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*ChildExecutionInfo
DeleteChildExecutionInfo *int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfo *int64
}
UpdateWorkflowExecutionRequest is used to update a workflow execution
type UserTimerTask ¶
UserTimerTask identifies a timeout task.
func (*UserTimerTask) GetTaskID ¶
func (u *UserTimerTask) GetTaskID() int64
GetTaskID returns the sequence ID of the timer task.
func (*UserTimerTask) GetType ¶
func (u *UserTimerTask) GetType() int
GetType returns the type of the timer task
func (*UserTimerTask) GetVisibilityTimestamp ¶
func (u *UserTimerTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*UserTimerTask) SetTaskID ¶
func (u *UserTimerTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the timer task.
func (*UserTimerTask) SetVisibilityTimestamp ¶
func (u *UserTimerTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type VisibilityManager ¶
type VisibilityManager interface {
Closeable
RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error
ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error)
}
VisibilityManager is used to manage the visibility store
func NewCassandraVisibilityPersistence ¶
func NewCassandraVisibilityPersistence( hosts string, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error)
NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation
type WorkflowExecutionInfo ¶
type WorkflowExecutionInfo struct {
DomainID string
WorkflowID string
RunID string
ParentDomainID string
ParentWorkflowID string
ParentRunID string
InitiatedID int64
CompletionEvent []byte
TaskList string
WorkflowTypeName string
DecisionTimeoutValue int32
ExecutionContext []byte
State int
CloseStatus int
NextEventID int64
LastProcessedEvent int64
StartTimestamp time.Time
LastUpdatedTimestamp time.Time
CreateRequestID string
DecisionScheduleID int64
DecisionStartedID int64
DecisionRequestID string
DecisionTimeout int32
CancelRequested bool
CancelRequestID string
}
WorkflowExecutionInfo describes a workflow execution
type WorkflowMutableState ¶
type WorkflowMutableState struct {
ActivitInfos map[int64]*ActivityInfo
TimerInfos map[string]*TimerInfo
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
ExecutionInfo *WorkflowExecutionInfo
}
WorkflowMutableState indicates workflow related state
type WorkflowTimeoutTask ¶
WorkflowTimeoutTask identifies a timeout task.
func (*WorkflowTimeoutTask) GetTaskID ¶
func (u *WorkflowTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*WorkflowTimeoutTask) GetType ¶
func (u *WorkflowTimeoutTask) GetType() int
GetType returns the type of the timeout task.
func (*WorkflowTimeoutTask) GetVisibilityTimestamp ¶
func (u *WorkflowTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*WorkflowTimeoutTask) SetTaskID ¶
func (u *WorkflowTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
func (*WorkflowTimeoutTask) SetVisibilityTimestamp ¶
func (u *WorkflowTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp