shard

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor interface {
	Start()
	Stop()
}

type ExternalQueue

type ExternalQueue interface {
	Push(action *api.Action) error
	Poll(actionName string, batchSize int) (*api.Actions, error)
}

type FlowEngine

type FlowEngine struct {
	// contains filtered or unexported fields
}

func NewFlowEngine

func NewFlowEngine(storage Storage, metadadataService metadata.MetadataService, stateMachineContainer *FlowStateMachineContainer) *FlowEngine

func (*FlowEngine) DelayAction

func (f *FlowEngine) DelayAction(wfName string, flowId string, actionName string, actionId int, delay time.Duration)

func (*FlowEngine) ExecuteAction

func (f *FlowEngine) ExecuteAction(wfName string, wfId string, event string, actionId int, data map[string]any)

func (*FlowEngine) ExecuteDelay

func (f *FlowEngine) ExecuteDelay(wfName string, flowId string, actionId int)

func (*FlowEngine) ExecuteResume

func (f *FlowEngine) ExecuteResume(wfName string, flowId string, event string)

func (*FlowEngine) ExecuteResumeAfterWait

func (f *FlowEngine) ExecuteResumeAfterWait(wfName string, flowId string, event string) error

func (*FlowEngine) ExecuteRetry

func (f *FlowEngine) ExecuteRetry(wfName string, flowId string, actionId int)

func (*FlowEngine) ExecuteSystemAction

func (f *FlowEngine) ExecuteSystemAction(wfName string, flowId string, actionId int)

func (*FlowEngine) Init

func (f *FlowEngine) Init(wfName string, flowId string, input map[string]any) error

func (*FlowEngine) MarkComplete

func (f *FlowEngine) MarkComplete(wfName string, flowId string, successhandler flow.Statehandler)

func (*FlowEngine) MarkFailed

func (f *FlowEngine) MarkFailed(wfName string, flowId string)

func (*FlowEngine) MarkPaused

func (f *FlowEngine) MarkPaused(wfName string, flowId string)

func (*FlowEngine) MarkRunning

func (f *FlowEngine) MarkRunning(wfName string, flowId string)

func (*FlowEngine) MarkWaitingDelay

func (f *FlowEngine) MarkWaitingDelay(wfName string, flowId string)

func (*FlowEngine) MarkWaitingEvent

func (f *FlowEngine) MarkWaitingEvent(wfName string, flowId string, event string)

func (*FlowEngine) RetryFailedAction

func (f *FlowEngine) RetryFailedAction(wfName string, flowId string, actionName string, actionId int)

func (*FlowEngine) RetryTimedoutAction

func (f *FlowEngine) RetryTimedoutAction(wfName string, flowId string, actionName string, actionId int)

type FlowStateMachine

type FlowStateMachine struct {
	// contains filtered or unexported fields
}

func NewFlowStateMachine

func NewFlowStateMachine(workflow string, id string, context *model.FlowContext, flow *flow.Flow) *FlowStateMachine

func (*FlowStateMachine) ChangeState

func (fm *FlowStateMachine) ChangeState(state model.FlowState)

func (*FlowStateMachine) MoveForward

func (fm *FlowStateMachine) MoveForward(actionId int, event string, dataMap map[string]any) (bool, []int)

func (*FlowStateMachine) SaveEvent

func (fm *FlowStateMachine) SaveEvent(event string)

func (*FlowStateMachine) Validate

func (fm *FlowStateMachine) Validate(actionId int) error

type FlowStateMachineContainer

type FlowStateMachineContainer struct {
	// contains filtered or unexported fields
}

func NewFlowStateMachineContainer

func NewFlowStateMachineContainer(storage Storage, metadataService metadata.MetadataService) *FlowStateMachineContainer

func (*FlowStateMachineContainer) Delete

func (sc *FlowStateMachineContainer) Delete(wf string, flowId string)

func (*FlowStateMachineContainer) Get

func (*FlowStateMachineContainer) Init

func (sc *FlowStateMachineContainer) Init(wf string, flowId string, input map[string]any) (*FlowStateMachine, error)

func (*FlowStateMachineContainer) Store

func (sc *FlowStateMachineContainer) Store(wf string, flowId string, m *FlowStateMachine)

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

func NewShard

func NewShard(id string, externalQueue ExternalQueue, storage Storage, engine *FlowEngine, stateHandler *StateHandlerContainer) *Shard

func (*Shard) GetEngine

func (s *Shard) GetEngine() *FlowEngine

func (*Shard) GetExternalQueue

func (s *Shard) GetExternalQueue() ExternalQueue

func (*Shard) GetShardId

func (s *Shard) GetShardId() string

func (*Shard) GetStorage

func (s *Shard) GetStorage() Storage

func (*Shard) RegisterExecutor

func (s *Shard) RegisterExecutor(name string, executor Executor)

func (*Shard) Start

func (s *Shard) Start()

func (*Shard) Stop

func (s *Shard) Stop()

type StateHandlerContainer

type StateHandlerContainer struct {
	// contains filtered or unexported fields
}

func NewStateHandlerContainer

func NewStateHandlerContainer(storage Storage) *StateHandlerContainer

func (*StateHandlerContainer) GetHandler

func (s *StateHandlerContainer) GetHandler(st flow.Statehandler) func(wfName string, wfId string) error

type Storage

type Storage interface {
	SaveFlowContext(wfName string, flowId string, flowCtx *model.FlowContext) error
	GetFlowContext(wfName string, flowId string) (*model.FlowContext, error)
	DeleteFlowContext(wfName string, flowId string) error

	SaveFlowContextAndDispatchAction(wfName string, flowId string, flowCtx *model.FlowContext, actions []model.ActionExecutionRequest) error
	PollAction(actionType string, batchSize int) ([]model.ActionExecutionRequest, error)
	Ack(actionType string, actions []model.ActionExecutionRequest) error
	Retry(wfName string, flowId string, actionName string, actionId int, delay time.Duration) error
	PollRetry() ([]model.ActionExecutionRequest, error)
	Delay(wfName string, flowId string, actionName string, actionId int, delay time.Duration) error
	PollDelay() ([]model.ActionExecutionRequest, error)
	Timeout(wfName string, flowId string, actionName string, actionId int, delay time.Duration) error
	PollTimeout() ([]model.ActionExecutionRequest, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL