Documentation
¶
Index ¶
- func LazyBootstrapWorker(actionID string, h func(c InputMessageContext, m InputMessage) error) error
- type AddActionRequest
- type CreateFlowRequest
- type Flow
- type FlowListResponse
- type FlowStatus
- type FlowTriggerType
- type InitMongodDBWorkflowStorageAdapterOpt
- type InitNATSWorkerMessengerAdapterOpt
- type InitNATSWorkflowMessengerAdapterOpt
- type InputMessage
- type InputMessageContext
- type MDFlow
- type MDWorkflowAction
- type MDWorkflowActionDep
- type MDWorkflowSessionContext
- type Mapper
- type MapperMode
- type MongodDBWorkerStorageAdapter
- type MongodDBWorkflowStorageAdapter
- func (w *MongodDBWorkflowStorageAdapter) AddAction(ctx context.Context, req *AddActionRequest) (*WorkflowAction, error)
- func (w *MongodDBWorkflowStorageAdapter) AddDep(ctx context.Context, tenantID, workflowID, key, metaOutput, depKey string) error
- func (w *MongodDBWorkflowStorageAdapter) Close(ctx context.Context) error
- func (w *MongodDBWorkflowStorageAdapter) CreateFlow(ctx context.Context, req *CreateFlowRequest) (*Flow, error)
- func (w *MongodDBWorkflowStorageAdapter) CreateSessionContext(ctx context.Context, workflowID, sessionID, taskID string, ...) error
- func (w *MongodDBWorkflowStorageAdapter) DeleteAction(ctx context.Context, tenantID, workflowID, key string) error
- func (w *MongodDBWorkflowStorageAdapter) DeleteAllActions(ctx context.Context, tenantID, workflowID string) error
- func (w *MongodDBWorkflowStorageAdapter) DeleteAllDeps(ctx context.Context, tenantID, workflowID string) error
- func (w *MongodDBWorkflowStorageAdapter) DeleteFlow(ctx context.Context, tenantID, flowID string) error
- func (w *MongodDBWorkflowStorageAdapter) DeleteSessionContext(ctx context.Context, workflowID, sessionID, taskID string) error
- func (w *MongodDBWorkflowStorageAdapter) DisableWorkflowAction(ctx context.Context, tenantID, workflowID, key string) error
- func (w *MongodDBWorkflowStorageAdapter) GetFlow(ctx context.Context, tenantID, flowID string) (*Flow, error)
- func (w *MongodDBWorkflowStorageAdapter) GetSessionContext(ctx context.Context, workflowID, sessionID, taskID string) (map[string]map[string]interface{}, error)
- func (w *MongodDBWorkflowStorageAdapter) GetWorkflowActions(ctx context.Context, tenantID, workflowID string) ([]WorkflowAction, error)
- func (w *MongodDBWorkflowStorageAdapter) GetWorkflowPeers(ctx context.Context, tenantID, workflowID string) ([]WorkflowPeer, error)
- func (w *MongodDBWorkflowStorageAdapter) ListFlows(ctx context.Context, tenantID string, page, pageSize int) (*FlowListResponse, error)
- func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowAction(ctx context.Context, tenantID, workflowID, key string) (*WorkflowAction, error)
- func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowActionDependencies(ctx context.Context, tenantID, workflowID, key, metaOutput string) ([]WorkflowAction, error)
- func (w *MongodDBWorkflowStorageAdapter) UpdateAction(ctx context.Context, req *UpdateActionRequest) (*WorkflowAction, error)
- func (w *MongodDBWorkflowStorageAdapter) UpdateFlow(ctx context.Context, req *UpdateFlowRequest) (*Flow, error)
- type NATSWorkerMessengerAdapter
- func (m *NATSWorkerMessengerAdapter) Close(ctx context.Context) error
- func (m *NATSWorkerMessengerAdapter) ListenInputMessages(ctx context.Context, h func(c InputMessageContext, message InputMessage) error) error
- func (m *NATSWorkerMessengerAdapter) SendOutputMessage(ctx context.Context, message OutputMessage) error
- func (m *NATSWorkerMessengerAdapter) SendTriggerMessage(ctx context.Context, message TriggerMessage) error
- type NATSWorkflowMessengerAdapter
- func (m *NATSWorkflowMessengerAdapter) Close(ctx context.Context) error
- func (m *NATSWorkflowMessengerAdapter) ListenOutputMessages(ctx context.Context, ...) error
- func (m *NATSWorkflowMessengerAdapter) ListenTriggerMessages(ctx context.Context, ...) error
- func (m *NATSWorkflowMessengerAdapter) SendInputMessage(ctx context.Context, message InputMessage) error
- type NatsInputMessage
- type NatsOutputMessage
- type NatsTriggerMessage
- type OutputMessage
- type OutputMessageContext
- type TriggerMessage
- type TriggerMessageContext
- type UpdateActionRequest
- type UpdateFlowRequest
- type Worker
- func (w *Worker) Close(ctx context.Context) error
- func (w *Worker) GetAllConfigs(ctx context.Context) ([]WorkerConfig, error)
- func (w *Worker) Run(ctx context.Context, h func(c InputMessageContext, m InputMessage) error) error
- func (w *Worker) SendTriggerMessage(ctx context.Context, m TriggerMessage) error
- type WorkerConfig
- type WorkerMessengerAdapter
- type WorkerStorageAdapter
- type Workflow
- type WorkflowAction
- type WorkflowInfo
- type WorkflowMessengerAdapter
- type WorkflowPeer
- type WorkflowStorageAdapter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LazyBootstrapWorker ¶
func LazyBootstrapWorker(actionID string, h func(c InputMessageContext, m InputMessage) error) error
Types ¶
type AddActionRequest ¶
type CreateFlowRequest ¶
type Flow ¶
type Flow struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Name string `json:"name"`
TriggerType FlowTriggerType `json:"trigger_type"`
Meta map[string]string `json:"meta,omitempty"`
Status FlowStatus `json:"status"`
Version uint64 `json:"version"`
}
type FlowListResponse ¶
type FlowListResponse struct {
Flows []WorkflowInfo `json:"flows"`
Total int64 `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
type FlowStatus ¶
type FlowStatus string
var ( FlowStatusDraft FlowStatus = "draft" FlowStatusActive FlowStatus = "active" )
type FlowTriggerType ¶
type FlowTriggerType string
var ( FlowTriggerTypeEvent FlowTriggerType = "event" FlowTriggerTypeSchedule FlowTriggerType = "schedule" )
type InitMongodDBWorkflowStorageAdapterOpt ¶
type InitMongodDBWorkflowStorageAdapterOpt struct {
BetaAutoSetupSchema bool
}
type InitNATSWorkerMessengerAdapterOpt ¶
type InitNATSWorkerMessengerAdapterOpt struct {
BetaAutoSetupNATS bool
}
type InitNATSWorkflowMessengerAdapterOpt ¶
type InitNATSWorkflowMessengerAdapterOpt struct {
BetaAutoSetupNATS bool
}
type InputMessage ¶
type InputMessage struct {
SessionID string
TaskID string
TenantID string
WorkflowID string
// TODO
// WorkflowActionID string
Key string
ActionID string
Values string
}
func (*InputMessage) ToOutputMessage ¶
func (m *InputMessage) ToOutputMessage(metaOutput, values string) OutputMessage
type InputMessageContext ¶
type MDFlow ¶
type MDFlow struct {
ID string `bson:"_id"`
Version uint64 `bson:"version"`
Name string `bson:"name"`
TenantID string `bson:"tenant_id"`
TriggerType FlowTriggerType `bson:"trigger_type"`
Meta map[string]string `bson:"meta,omitempty"`
Status FlowStatus `bson:"status"`
}
type MDWorkflowAction ¶
type MDWorkflowAction struct {
ID string `bson:"_id"`
Key string `bson:"key"` // Composite unique index
TenantID string `bson:"tenant_id"` // Composite unique index
WorkflowID string `bson:"workflow_id"` // Composite unique index
ActionID string `bson:"action_id"`
Config map[string]string `bson:"config"`
Map map[string]Mapper `bson:"map"`
Meta map[string]string `bson:"meta,omitempty"`
Disabled bool `bson:"disabled"`
}
type MDWorkflowActionDep ¶
type MDWorkflowSessionContext ¶
type MDWorkflowSessionContext struct {
ID string `bson:"_id"`
WorkflowID string `bson:"workflow_id"` // Composite unique index
SessionID string `bson:"session_id"` // Composite unique index
TaskID string `bson:"task_id"` // Composite unique index
Value map[string]map[string]interface{} `bson:"value"`
}
type Mapper ¶
type Mapper struct {
Mode MapperMode `json:"mode"`
Value string `json:"value"`
}
type MapperMode ¶
type MapperMode string
var ( MapperModeFixed MapperMode = "fixed" MapperModeKey MapperMode = "key" MapperModeExpression MapperMode = "expression" )
type MongodDBWorkerStorageAdapter ¶
type MongodDBWorkerStorageAdapter struct {
// contains filtered or unexported fields
}
func InitMongodDBWorkerStorageAdapter ¶
func InitMongodDBWorkerStorageAdapter(ctx context.Context) (*MongodDBWorkerStorageAdapter, error)
func NewMongodDBWorkerStorageAdapter ¶
func NewMongodDBWorkerStorageAdapter(client *mongo.Client, db *mongo.Database) *MongodDBWorkerStorageAdapter
func (*MongodDBWorkerStorageAdapter) Close ¶
func (w *MongodDBWorkerStorageAdapter) Close(ctx context.Context) error
func (*MongodDBWorkerStorageAdapter) GetAllConfigs ¶
func (w *MongodDBWorkerStorageAdapter) GetAllConfigs(ctx context.Context, actionID string) ([]WorkerConfig, error)
type MongodDBWorkflowStorageAdapter ¶
type MongodDBWorkflowStorageAdapter struct {
// contains filtered or unexported fields
}
func InitMongodDBWorkflowStorageAdapter ¶
func InitMongodDBWorkflowStorageAdapter(ctx context.Context, opt InitMongodDBWorkflowStorageAdapterOpt) (*MongodDBWorkflowStorageAdapter, error)
func NewMongodDBWorkflowStorageAdapter ¶
func NewMongodDBWorkflowStorageAdapter(client *mongo.Client, db *mongo.Database) *MongodDBWorkflowStorageAdapter
func (*MongodDBWorkflowStorageAdapter) AddAction ¶
func (w *MongodDBWorkflowStorageAdapter) AddAction(ctx context.Context, req *AddActionRequest) (*WorkflowAction, error)
func (*MongodDBWorkflowStorageAdapter) AddDep ¶
func (w *MongodDBWorkflowStorageAdapter) AddDep( ctx context.Context, tenantID, workflowID, key, metaOutput, depKey string, ) error
func (*MongodDBWorkflowStorageAdapter) Close ¶
func (w *MongodDBWorkflowStorageAdapter) Close(ctx context.Context) error
func (*MongodDBWorkflowStorageAdapter) CreateFlow ¶
func (w *MongodDBWorkflowStorageAdapter) CreateFlow(ctx context.Context, req *CreateFlowRequest) (*Flow, error)
func (*MongodDBWorkflowStorageAdapter) CreateSessionContext ¶
func (*MongodDBWorkflowStorageAdapter) DeleteAction ¶
func (w *MongodDBWorkflowStorageAdapter) DeleteAction(ctx context.Context, tenantID, workflowID, key string) error
func (*MongodDBWorkflowStorageAdapter) DeleteAllActions ¶
func (w *MongodDBWorkflowStorageAdapter) DeleteAllActions(ctx context.Context, tenantID, workflowID string) error
func (*MongodDBWorkflowStorageAdapter) DeleteAllDeps ¶
func (w *MongodDBWorkflowStorageAdapter) DeleteAllDeps(ctx context.Context, tenantID, workflowID string) error
func (*MongodDBWorkflowStorageAdapter) DeleteFlow ¶
func (w *MongodDBWorkflowStorageAdapter) DeleteFlow(ctx context.Context, tenantID, flowID string) error
func (*MongodDBWorkflowStorageAdapter) DeleteSessionContext ¶
func (w *MongodDBWorkflowStorageAdapter) DeleteSessionContext(ctx context.Context, workflowID, sessionID, taskID string) error
func (*MongodDBWorkflowStorageAdapter) DisableWorkflowAction ¶
func (w *MongodDBWorkflowStorageAdapter) DisableWorkflowAction(ctx context.Context, tenantID, workflowID, key string) error
func (*MongodDBWorkflowStorageAdapter) GetSessionContext ¶
func (*MongodDBWorkflowStorageAdapter) GetWorkflowActions ¶
func (w *MongodDBWorkflowStorageAdapter) GetWorkflowActions(ctx context.Context, tenantID, workflowID string) ([]WorkflowAction, error)
func (*MongodDBWorkflowStorageAdapter) GetWorkflowPeers ¶ added in v1.3.7
func (w *MongodDBWorkflowStorageAdapter) GetWorkflowPeers(ctx context.Context, tenantID, workflowID string) ([]WorkflowPeer, error)
func (*MongodDBWorkflowStorageAdapter) ListFlows ¶
func (w *MongodDBWorkflowStorageAdapter) ListFlows(ctx context.Context, tenantID string, page, pageSize int) (*FlowListResponse, error)
func (*MongodDBWorkflowStorageAdapter) QueryWorkflowAction ¶
func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowAction(ctx context.Context, tenantID, workflowID, key string) (*WorkflowAction, error)
func (*MongodDBWorkflowStorageAdapter) QueryWorkflowActionDependencies ¶
func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowActionDependencies(ctx context.Context, tenantID, workflowID, key, metaOutput string) ([]WorkflowAction, error)
func (*MongodDBWorkflowStorageAdapter) UpdateAction ¶
func (w *MongodDBWorkflowStorageAdapter) UpdateAction(ctx context.Context, req *UpdateActionRequest) (*WorkflowAction, error)
func (*MongodDBWorkflowStorageAdapter) UpdateFlow ¶
func (w *MongodDBWorkflowStorageAdapter) UpdateFlow(ctx context.Context, req *UpdateFlowRequest) (*Flow, error)
type NATSWorkerMessengerAdapter ¶
type NATSWorkerMessengerAdapter struct {
// contains filtered or unexported fields
}
func InitNATSWorkerMessengerAdapter ¶
func InitNATSWorkerMessengerAdapter(ctx context.Context, actionID string, opt InitNATSWorkerMessengerAdapterOpt) (*NATSWorkerMessengerAdapter, error)
func (*NATSWorkerMessengerAdapter) Close ¶
func (m *NATSWorkerMessengerAdapter) Close(ctx context.Context) error
func (*NATSWorkerMessengerAdapter) ListenInputMessages ¶
func (m *NATSWorkerMessengerAdapter) ListenInputMessages(ctx context.Context, h func(c InputMessageContext, message InputMessage) error) error
func (*NATSWorkerMessengerAdapter) SendOutputMessage ¶
func (m *NATSWorkerMessengerAdapter) SendOutputMessage(ctx context.Context, message OutputMessage) error
func (*NATSWorkerMessengerAdapter) SendTriggerMessage ¶
func (m *NATSWorkerMessengerAdapter) SendTriggerMessage(ctx context.Context, message TriggerMessage) error
type NATSWorkflowMessengerAdapter ¶
type NATSWorkflowMessengerAdapter struct {
// contains filtered or unexported fields
}
func InitNATSWorkflowMessengerAdapter ¶
func InitNATSWorkflowMessengerAdapter(ctx context.Context, opt InitNATSWorkflowMessengerAdapterOpt) (*NATSWorkflowMessengerAdapter, error)
func (*NATSWorkflowMessengerAdapter) Close ¶
func (m *NATSWorkflowMessengerAdapter) Close(ctx context.Context) error
func (*NATSWorkflowMessengerAdapter) ListenOutputMessages ¶
func (m *NATSWorkflowMessengerAdapter) ListenOutputMessages(ctx context.Context, h func(c OutputMessageContext, message OutputMessage) error) error
func (*NATSWorkflowMessengerAdapter) ListenTriggerMessages ¶
func (m *NATSWorkflowMessengerAdapter) ListenTriggerMessages(ctx context.Context, h func(c TriggerMessageContext, message TriggerMessage) error) error
func (*NATSWorkflowMessengerAdapter) SendInputMessage ¶
func (m *NATSWorkflowMessengerAdapter) SendInputMessage(ctx context.Context, message InputMessage) error
type NatsInputMessage ¶
type NatsInputMessage struct {
SessionID string `json:"session_id"`
TaskID string `json:"task_id"`
WorkflowID string `json:"workflow_id"`
TenantID string `json:"tenant_id"`
// TODO
// WorkflowActionID string `json:"workflow_action_id"`
Key string `json:"key"`
ActionID string `json:"action_id"`
Values string `json:"values"`
}
func (*NatsInputMessage) ToInputMessage ¶
func (n *NatsInputMessage) ToInputMessage() InputMessage
type NatsOutputMessage ¶
type NatsOutputMessage struct {
SessionID string `json:"session_id"`
TaskID string `json:"task_id"`
WorkflowID string `json:"workflow_id"`
TenantID string `json:"tenant_id"`
// TODO
// WorkflowActionID string `json:"workflow_action_id"`
MetaOutput string `json:"meta_output"`
Key string `json:"key"`
ActionID string `json:"action_id"`
Values string `json:"values"`
}
func (NatsOutputMessage) FromOutputMessage ¶
func (n NatsOutputMessage) FromOutputMessage(message OutputMessage) NatsOutputMessage
func (*NatsOutputMessage) ToOutputMessage ¶
func (n *NatsOutputMessage) ToOutputMessage() OutputMessage
type NatsTriggerMessage ¶
type NatsTriggerMessage struct {
WorkflowID string `json:"workflow_id"`
TenantID string `json:"tenant_id"`
// TODO
// WorkflowActionID string `json:"workflow_action_id"`
MetaOutput string `json:"meta_output"`
Key string `json:"key"`
ActionID string `json:"action_id"`
Values string `json:"values"`
}
func (NatsTriggerMessage) FromTriggerMessage ¶
func (n NatsTriggerMessage) FromTriggerMessage(message TriggerMessage) NatsTriggerMessage
func (*NatsTriggerMessage) ToTriggerMessage ¶
func (n *NatsTriggerMessage) ToTriggerMessage() TriggerMessage
type OutputMessage ¶
type OutputMessageContext ¶
type TriggerMessage ¶
type TriggerMessageContext ¶
type UpdateActionRequest ¶
type UpdateFlowRequest ¶
type UpdateFlowRequest struct {
TenantID string `json:"tenant_id"`
FlowID string `json:"flow_id"`
Name string `json:"name"`
TriggerType FlowTriggerType `json:"trigger_type"`
Meta map[string]string `json:"meta,omitempty"`
Status FlowStatus `json:"status"`
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func InitDefaultWorker ¶
func (*Worker) GetAllConfigs ¶
func (w *Worker) GetAllConfigs(ctx context.Context) ([]WorkerConfig, error)
func (*Worker) Run ¶
func (w *Worker) Run(ctx context.Context, h func(c InputMessageContext, m InputMessage) error) error
func (*Worker) SendTriggerMessage ¶
func (w *Worker) SendTriggerMessage(ctx context.Context, m TriggerMessage) error
type WorkerConfig ¶
type WorkerMessengerAdapter ¶
type WorkerMessengerAdapter interface {
ListenInputMessages(ctx context.Context, h func(c InputMessageContext, message InputMessage) error) error
SendTriggerMessage(ctx context.Context, message TriggerMessage) error
SendOutputMessage(ctx context.Context, message OutputMessage) error
Close(ctx context.Context) error
}
type WorkerStorageAdapter ¶
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
func InitWorkflow ¶
func InitWorkflow( messenger WorkflowMessengerAdapter, storage WorkflowStorageAdapter, ) *Workflow
func (*Workflow) Messenger ¶
func (w *Workflow) Messenger() WorkflowMessengerAdapter
func (*Workflow) Storage ¶
func (w *Workflow) Storage() WorkflowStorageAdapter
type WorkflowAction ¶
type WorkflowAction struct {
ID string `json:"id"`
Key string `json:"key"`
TenantID string `json:"tenant_id"`
WorkflowID string `json:"workflow_id"`
ActionID string `json:"action_id"`
Config map[string]string `json:"config"`
Map map[string]Mapper `json:"map"`
Meta map[string]string `json:"meta,omitempty"`
Disabled bool `json:"disabled"`
}
type WorkflowInfo ¶
type WorkflowInfo struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Name string `json:"name"`
TriggerType FlowTriggerType `json:"trigger_type"`
Status FlowStatus `json:"status"`
Version uint64 `json:"version"`
Meta map[string]string `json:"meta,omitempty"`
}
type WorkflowMessengerAdapter ¶
type WorkflowMessengerAdapter interface {
ListenTriggerMessages(ctx context.Context, h func(c TriggerMessageContext, message TriggerMessage) error) error
ListenOutputMessages(ctx context.Context, h func(c OutputMessageContext, message OutputMessage) error) error
SendInputMessage(ctx context.Context, message InputMessage) error
Close(ctx context.Context) error
}
type WorkflowPeer ¶ added in v1.3.7
type WorkflowStorageAdapter ¶
type WorkflowStorageAdapter interface {
QueryWorkflowAction(ctx context.Context, tenantID, workflowID, key string) (*WorkflowAction, error)
QueryWorkflowActionDependencies(ctx context.Context, tenantID, workflowID, key, metaOutput string) ([]WorkflowAction, error)
AddAction(ctx context.Context, req *AddActionRequest) (*WorkflowAction, error)
DeleteAction(ctx context.Context, tenantID, workflowID, key string) error
DeleteAllActions(ctx context.Context, tenantID, workflowID string) error
AddDep(ctx context.Context, tenantID, workflowID, key, metaOutput, key2 string) error
DeleteAllDeps(ctx context.Context, tenantID, workflowID string) error
GetSessionContext(ctx context.Context, workflowID, sessionID, taskID string) (map[string]map[string]interface{}, error)
CreateSessionContext(ctx context.Context, workflowID, sessionID, taskID string, value map[string]map[string]interface{}) error
DeleteSessionContext(ctx context.Context, workflowID, sessionID, taskID string) error
DisableWorkflowAction(ctx context.Context, tenantID, workflowID, key string) error
ListFlows(ctx context.Context, tenantID string, page, pageSize int) (*FlowListResponse, error)
GetWorkflowActions(ctx context.Context, tenantID, workflowID string) ([]WorkflowAction, error)
GetWorkflowPeers(ctx context.Context, tenantID, workflowID string) ([]WorkflowPeer, error)
UpdateAction(ctx context.Context, req *UpdateActionRequest) (*WorkflowAction, error)
CreateFlow(ctx context.Context, req *CreateFlowRequest) (*Flow, error)
GetFlow(ctx context.Context, tenantID, flowID string) (*Flow, error)
UpdateFlow(ctx context.Context, req *UpdateFlowRequest) (*Flow, error)
DeleteFlow(ctx context.Context, tenantID, flowID string) error
Close(ctx context.Context) error
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.