spider

package
v1.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LazyBootstrapWorker

func LazyBootstrapWorker(actionID string, h func(c InputMessageContext, m InputMessage) error) error

Types

type AddActionRequest

type AddActionRequest struct {
	TenantID   string            `json:"tenant_id"`
	WorkflowID string            `json:"workflow_id"`
	Key        string            `json:"key"`
	ActionID   string            `json:"action_id"`
	Config     map[string]string `json:"config"`
	Map        map[string]Mapper `json:"map"`
	Meta       map[string]string `json:"meta,omitempty"`
}

type CreateFlowRequest

type CreateFlowRequest struct {
	ID          string            `json:"id"`
	TenantID    string            `json:"tenant_id"`
	Name        string            `json:"name"`
	TriggerType FlowTriggerType   `json:"trigger_type"`
	Meta        map[string]string `json:"meta,omitempty"`
}

type Flow

type Flow struct {
	ID          string            `json:"id"`
	TenantID    string            `json:"tenant_id"`
	Name        string            `json:"name"`
	TriggerType FlowTriggerType   `json:"trigger_type"`
	Meta        map[string]string `json:"meta,omitempty"`
	Status      FlowStatus        `json:"status"`
	Version     uint64            `json:"version"`
}

type FlowListResponse

type FlowListResponse struct {
	Flows    []WorkflowInfo `json:"flows"`
	Total    int64          `json:"total"`
	Page     int            `json:"page"`
	PageSize int            `json:"page_size"`
}

type FlowStatus

type FlowStatus string
var (
	FlowStatusDraft  FlowStatus = "draft"
	FlowStatusActive FlowStatus = "active"
)

type FlowTriggerType

type FlowTriggerType string
var (
	FlowTriggerTypeEvent    FlowTriggerType = "event"
	FlowTriggerTypeSchedule FlowTriggerType = "schedule"
)

type InitMongodDBWorkflowStorageAdapterOpt

type InitMongodDBWorkflowStorageAdapterOpt struct {
	BetaAutoSetupSchema bool
}

type InitNATSWorkerMessengerAdapterOpt

type InitNATSWorkerMessengerAdapterOpt struct {
	BetaAutoSetupNATS bool
}

type InitNATSWorkflowMessengerAdapterOpt

type InitNATSWorkflowMessengerAdapterOpt struct {
	BetaAutoSetupNATS bool
}

type InputMessage

type InputMessage struct {
	SessionID  string
	TaskID     string
	TenantID   string
	WorkflowID string
	// TODO
	// WorkflowActionID string
	Key      string
	ActionID string
	Values   string
}

func (*InputMessage) ToOutputMessage

func (m *InputMessage) ToOutputMessage(metaOutput, values string) OutputMessage

type InputMessageContext

type InputMessageContext struct {
	Context    context.Context
	Timestamp  time.Time
	SendOutput func(metaOutput string, values string) error
}

type MDFlow

type MDFlow struct {
	ID          string            `bson:"_id"`
	Version     uint64            `bson:"version"`
	Name        string            `bson:"name"`
	TenantID    string            `bson:"tenant_id"`
	TriggerType FlowTriggerType   `bson:"trigger_type"`
	Meta        map[string]string `bson:"meta,omitempty"`
	Status      FlowStatus        `bson:"status"`
}

type MDWorkflowAction

type MDWorkflowAction struct {
	ID         string            `bson:"_id"`
	Key        string            `bson:"key"`         // Composite unique index
	TenantID   string            `bson:"tenant_id"`   // Composite unique index
	WorkflowID string            `bson:"workflow_id"` // Composite unique index
	ActionID   string            `bson:"action_id"`
	Config     map[string]string `bson:"config"`
	Map        map[string]Mapper `bson:"map"`
	Meta       map[string]string `bson:"meta,omitempty"`
	Disabled   bool              `bson:"disabled"`
}

type MDWorkflowActionDep

type MDWorkflowActionDep struct {
	ID         string `bson:"_id"`
	WorkflowID string `bson:"workflow_id"` // Composite unique index
	Key        string `bson:"key"`         // Composite unique index
	MetaOutput string `bson:"meta_output"` // Composite unique index
	DepKey     string `bson:"dep_key"`     // Composite unique index
}

type MDWorkflowSessionContext

type MDWorkflowSessionContext struct {
	ID         string                            `bson:"_id"`
	WorkflowID string                            `bson:"workflow_id"` // Composite unique index
	SessionID  string                            `bson:"session_id"`  // Composite unique index
	TaskID     string                            `bson:"task_id"`     // Composite unique index
	Value      map[string]map[string]interface{} `bson:"value"`
}

type Mapper

type Mapper struct {
	Mode  MapperMode `json:"mode"`
	Value string     `json:"value"`
}

type MapperMode

type MapperMode string
var (
	MapperModeFixed      MapperMode = "fixed"
	MapperModeKey        MapperMode = "key"
	MapperModeExpression MapperMode = "expression"
)

type MongodDBWorkerStorageAdapter

type MongodDBWorkerStorageAdapter struct {
	// contains filtered or unexported fields
}

func InitMongodDBWorkerStorageAdapter

func InitMongodDBWorkerStorageAdapter(ctx context.Context) (*MongodDBWorkerStorageAdapter, error)

func NewMongodDBWorkerStorageAdapter

func NewMongodDBWorkerStorageAdapter(client *mongo.Client, db *mongo.Database) *MongodDBWorkerStorageAdapter

func (*MongodDBWorkerStorageAdapter) Close

func (*MongodDBWorkerStorageAdapter) GetAllConfigs

func (w *MongodDBWorkerStorageAdapter) GetAllConfigs(ctx context.Context, actionID string) ([]WorkerConfig, error)

type MongodDBWorkflowStorageAdapter

type MongodDBWorkflowStorageAdapter struct {
	// contains filtered or unexported fields
}

func NewMongodDBWorkflowStorageAdapter

func NewMongodDBWorkflowStorageAdapter(client *mongo.Client, db *mongo.Database) *MongodDBWorkflowStorageAdapter

func (*MongodDBWorkflowStorageAdapter) AddAction

func (*MongodDBWorkflowStorageAdapter) AddDep

func (w *MongodDBWorkflowStorageAdapter) AddDep(
	ctx context.Context,
	tenantID,
	workflowID,
	key,
	metaOutput,
	depKey string,
) error

func (*MongodDBWorkflowStorageAdapter) Close

func (*MongodDBWorkflowStorageAdapter) CreateFlow

func (*MongodDBWorkflowStorageAdapter) CreateSessionContext

func (w *MongodDBWorkflowStorageAdapter) CreateSessionContext(ctx context.Context, workflowID, sessionID, taskID string, value map[string]map[string]interface{}) error

func (*MongodDBWorkflowStorageAdapter) DeleteAction

func (w *MongodDBWorkflowStorageAdapter) DeleteAction(ctx context.Context, tenantID, workflowID, key string) error

func (*MongodDBWorkflowStorageAdapter) DeleteAllActions

func (w *MongodDBWorkflowStorageAdapter) DeleteAllActions(ctx context.Context, tenantID, workflowID string) error

func (*MongodDBWorkflowStorageAdapter) DeleteAllDeps

func (w *MongodDBWorkflowStorageAdapter) DeleteAllDeps(ctx context.Context, tenantID, workflowID string) error

func (*MongodDBWorkflowStorageAdapter) DeleteFlow

func (w *MongodDBWorkflowStorageAdapter) DeleteFlow(ctx context.Context, tenantID, flowID string) error

func (*MongodDBWorkflowStorageAdapter) DeleteSessionContext

func (w *MongodDBWorkflowStorageAdapter) DeleteSessionContext(ctx context.Context, workflowID, sessionID, taskID string) error

func (*MongodDBWorkflowStorageAdapter) DisableWorkflowAction

func (w *MongodDBWorkflowStorageAdapter) DisableWorkflowAction(ctx context.Context, tenantID, workflowID, key string) error

func (*MongodDBWorkflowStorageAdapter) GetFlow

func (w *MongodDBWorkflowStorageAdapter) GetFlow(ctx context.Context, tenantID, flowID string) (*Flow, error)

func (*MongodDBWorkflowStorageAdapter) GetSessionContext

func (w *MongodDBWorkflowStorageAdapter) GetSessionContext(ctx context.Context, workflowID, sessionID, taskID string) (map[string]map[string]interface{}, error)

func (*MongodDBWorkflowStorageAdapter) GetWorkflowActions

func (w *MongodDBWorkflowStorageAdapter) GetWorkflowActions(ctx context.Context, tenantID, workflowID string) ([]WorkflowAction, error)

func (*MongodDBWorkflowStorageAdapter) GetWorkflowPeers added in v1.3.7

func (w *MongodDBWorkflowStorageAdapter) GetWorkflowPeers(ctx context.Context, tenantID, workflowID string) ([]WorkflowPeer, error)

func (*MongodDBWorkflowStorageAdapter) ListFlows

func (w *MongodDBWorkflowStorageAdapter) ListFlows(ctx context.Context, tenantID string, page, pageSize int) (*FlowListResponse, error)

func (*MongodDBWorkflowStorageAdapter) QueryWorkflowAction

func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowAction(ctx context.Context, tenantID, workflowID, key string) (*WorkflowAction, error)

func (*MongodDBWorkflowStorageAdapter) QueryWorkflowActionDependencies

func (w *MongodDBWorkflowStorageAdapter) QueryWorkflowActionDependencies(ctx context.Context, tenantID, workflowID, key, metaOutput string) ([]WorkflowAction, error)

func (*MongodDBWorkflowStorageAdapter) UpdateAction

func (*MongodDBWorkflowStorageAdapter) UpdateFlow

type NATSWorkerMessengerAdapter

type NATSWorkerMessengerAdapter struct {
	// contains filtered or unexported fields
}

func (*NATSWorkerMessengerAdapter) Close

func (*NATSWorkerMessengerAdapter) ListenInputMessages

func (m *NATSWorkerMessengerAdapter) ListenInputMessages(ctx context.Context, h func(c InputMessageContext, message InputMessage) error) error

func (*NATSWorkerMessengerAdapter) SendOutputMessage

func (m *NATSWorkerMessengerAdapter) SendOutputMessage(ctx context.Context, message OutputMessage) error

func (*NATSWorkerMessengerAdapter) SendTriggerMessage

func (m *NATSWorkerMessengerAdapter) SendTriggerMessage(ctx context.Context, message TriggerMessage) error

type NATSWorkflowMessengerAdapter

type NATSWorkflowMessengerAdapter struct {
	// contains filtered or unexported fields
}

func (*NATSWorkflowMessengerAdapter) Close

func (*NATSWorkflowMessengerAdapter) ListenOutputMessages

func (m *NATSWorkflowMessengerAdapter) ListenOutputMessages(ctx context.Context, h func(c OutputMessageContext, message OutputMessage) error) error

func (*NATSWorkflowMessengerAdapter) ListenTriggerMessages

func (m *NATSWorkflowMessengerAdapter) ListenTriggerMessages(ctx context.Context, h func(c TriggerMessageContext, message TriggerMessage) error) error

func (*NATSWorkflowMessengerAdapter) SendInputMessage

func (m *NATSWorkflowMessengerAdapter) SendInputMessage(ctx context.Context, message InputMessage) error

type NatsInputMessage

type NatsInputMessage struct {
	SessionID  string `json:"session_id"`
	TaskID     string `json:"task_id"`
	WorkflowID string `json:"workflow_id"`
	TenantID   string `json:"tenant_id"`
	// TODO
	// WorkflowActionID string `json:"workflow_action_id"`
	Key      string `json:"key"`
	ActionID string `json:"action_id"`
	Values   string `json:"values"`
}

func (*NatsInputMessage) ToInputMessage

func (n *NatsInputMessage) ToInputMessage() InputMessage

type NatsOutputMessage

type NatsOutputMessage struct {
	SessionID  string `json:"session_id"`
	TaskID     string `json:"task_id"`
	WorkflowID string `json:"workflow_id"`
	TenantID   string `json:"tenant_id"`
	// TODO
	// WorkflowActionID string `json:"workflow_action_id"`
	MetaOutput string `json:"meta_output"`
	Key        string `json:"key"`
	ActionID   string `json:"action_id"`
	Values     string `json:"values"`
}

func (NatsOutputMessage) FromOutputMessage

func (n NatsOutputMessage) FromOutputMessage(message OutputMessage) NatsOutputMessage

func (*NatsOutputMessage) ToOutputMessage

func (n *NatsOutputMessage) ToOutputMessage() OutputMessage

type NatsTriggerMessage

type NatsTriggerMessage struct {
	WorkflowID string `json:"workflow_id"`
	TenantID   string `json:"tenant_id"`
	// TODO
	// WorkflowActionID string `json:"workflow_action_id"`
	MetaOutput string `json:"meta_output"`
	Key        string `json:"key"`
	ActionID   string `json:"action_id"`
	Values     string `json:"values"`
}

func (NatsTriggerMessage) FromTriggerMessage

func (n NatsTriggerMessage) FromTriggerMessage(message TriggerMessage) NatsTriggerMessage

func (*NatsTriggerMessage) ToTriggerMessage

func (n *NatsTriggerMessage) ToTriggerMessage() TriggerMessage

type OutputMessage

type OutputMessage struct {
	SessionID  string
	TaskID     string
	TenantID   string
	WorkflowID string
	// TODO
	// WorkflowActionID string
	Key        string
	ActionID   string
	MetaOutput string
	Values     string
}

type OutputMessageContext

type OutputMessageContext struct {
	Context   context.Context
	Timestamp time.Time
}

type TriggerMessage

type TriggerMessage struct {
	TenantID   string
	WorkflowID string
	// TODO
	// WorkflowActionID string
	Key        string
	ActionID   string
	MetaOutput string
	Values     string
}

type TriggerMessageContext

type TriggerMessageContext struct {
	Context   context.Context
	Timestamp time.Time
}

type UpdateActionRequest

type UpdateActionRequest struct {
	TenantID   string            `json:"tenant_id"`
	WorkflowID string            `json:"workflow_id"`
	Key        string            `json:"key"`
	Config     map[string]string `json:"config"`
	Map        map[string]Mapper `json:"map"`
	Meta       map[string]string `json:"meta,omitempty"`
}

type UpdateFlowRequest

type UpdateFlowRequest struct {
	TenantID    string            `json:"tenant_id"`
	FlowID      string            `json:"flow_id"`
	Name        string            `json:"name"`
	TriggerType FlowTriggerType   `json:"trigger_type"`
	Meta        map[string]string `json:"meta,omitempty"`
	Status      FlowStatus        `json:"status"`
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func InitDefaultWorker

func InitDefaultWorker(
	ctx context.Context,
	actionID string,
) (*Worker, error)

func (*Worker) Close

func (w *Worker) Close(ctx context.Context) error

func (*Worker) GetAllConfigs

func (w *Worker) GetAllConfigs(ctx context.Context) ([]WorkerConfig, error)

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, h func(c InputMessageContext, m InputMessage) error) error

func (*Worker) SendTriggerMessage

func (w *Worker) SendTriggerMessage(ctx context.Context, m TriggerMessage) error

type WorkerConfig

type WorkerConfig struct {
	WorkflowActionID string            `json:"workflow_action_id"`
	TenantID         string            `json:"tenant_id"`
	WorkflowID       string            `json:"workflow_id"`
	Key              string            `json:"key"`
	Config           map[string]string `json:"config"`
	Meta             map[string]string `json:"meta,omitempty"`
}

type WorkerMessengerAdapter

type WorkerMessengerAdapter interface {
	ListenInputMessages(ctx context.Context, h func(c InputMessageContext, message InputMessage) error) error
	SendTriggerMessage(ctx context.Context, message TriggerMessage) error
	SendOutputMessage(ctx context.Context, message OutputMessage) error
	Close(ctx context.Context) error
}

type WorkerStorageAdapter

type WorkerStorageAdapter interface {
	GetAllConfigs(ctx context.Context, actionID string) ([]WorkerConfig, error)
	Close(ctx context.Context) error
}

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

func InitDefaultWorkflow

func InitDefaultWorkflow(
	ctx context.Context,
) (*Workflow, error)

func InitWorkflow

func InitWorkflow(
	messenger WorkflowMessengerAdapter,
	storage WorkflowStorageAdapter,
) *Workflow

func (*Workflow) Close

func (w *Workflow) Close(ctx context.Context) error

func (*Workflow) Messenger

func (w *Workflow) Messenger() WorkflowMessengerAdapter

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context) error

func (*Workflow) Storage

func (w *Workflow) Storage() WorkflowStorageAdapter

type WorkflowAction

type WorkflowAction struct {
	ID         string            `json:"id"`
	Key        string            `json:"key"`
	TenantID   string            `json:"tenant_id"`
	WorkflowID string            `json:"workflow_id"`
	ActionID   string            `json:"action_id"`
	Config     map[string]string `json:"config"`
	Map        map[string]Mapper `json:"map"`
	Meta       map[string]string `json:"meta,omitempty"`
	Disabled   bool              `json:"disabled"`
}

type WorkflowInfo

type WorkflowInfo struct {
	ID          string            `json:"id"`
	TenantID    string            `json:"tenant_id"`
	Name        string            `json:"name"`
	TriggerType FlowTriggerType   `json:"trigger_type"`
	Status      FlowStatus        `json:"status"`
	Version     uint64            `json:"version"`
	Meta        map[string]string `json:"meta,omitempty"`
}

type WorkflowMessengerAdapter

type WorkflowMessengerAdapter interface {
	ListenTriggerMessages(ctx context.Context, h func(c TriggerMessageContext, message TriggerMessage) error) error
	ListenOutputMessages(ctx context.Context, h func(c OutputMessageContext, message OutputMessage) error) error
	SendInputMessage(ctx context.Context, message InputMessage) error
	Close(ctx context.Context) error
}

type WorkflowPeer added in v1.3.7

type WorkflowPeer struct {
	ParentKey  string `json:"parent_key"`
	MetaOutput string `json:"meta_output"`
	ChildKey   string `json:"child_key"`
}

type WorkflowStorageAdapter

type WorkflowStorageAdapter interface {
	QueryWorkflowAction(ctx context.Context, tenantID, workflowID, key string) (*WorkflowAction, error)
	QueryWorkflowActionDependencies(ctx context.Context, tenantID, workflowID, key, metaOutput string) ([]WorkflowAction, error)
	AddAction(ctx context.Context, req *AddActionRequest) (*WorkflowAction, error)
	DeleteAction(ctx context.Context, tenantID, workflowID, key string) error
	DeleteAllActions(ctx context.Context, tenantID, workflowID string) error
	AddDep(ctx context.Context, tenantID, workflowID, key, metaOutput, key2 string) error
	DeleteAllDeps(ctx context.Context, tenantID, workflowID string) error
	GetSessionContext(ctx context.Context, workflowID, sessionID, taskID string) (map[string]map[string]interface{}, error)
	CreateSessionContext(ctx context.Context, workflowID, sessionID, taskID string, value map[string]map[string]interface{}) error
	DeleteSessionContext(ctx context.Context, workflowID, sessionID, taskID string) error
	DisableWorkflowAction(ctx context.Context, tenantID, workflowID, key string) error
	ListFlows(ctx context.Context, tenantID string, page, pageSize int) (*FlowListResponse, error)
	GetWorkflowActions(ctx context.Context, tenantID, workflowID string) ([]WorkflowAction, error)
	GetWorkflowPeers(ctx context.Context, tenantID, workflowID string) ([]WorkflowPeer, error)
	UpdateAction(ctx context.Context, req *UpdateActionRequest) (*WorkflowAction, error)
	CreateFlow(ctx context.Context, req *CreateFlowRequest) (*Flow, error)
	GetFlow(ctx context.Context, tenantID, flowID string) (*Flow, error)
	UpdateFlow(ctx context.Context, req *UpdateFlowRequest) (*Flow, error)
	DeleteFlow(ctx context.Context, tenantID, flowID string) error
	Close(ctx context.Context) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL