Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Action ¶
type Action struct {
// DataReuse 是否复用上层Function数据
DataReuse bool
// 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
// ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
ForceEntryNext bool
// JumpFunc 跳转到指定Function继续执行
JumpFunc string
// Abort 终止Flow的执行
Abort bool
}
Action KisFlow执行流程Actions
func LoadActions ¶
func LoadActions(acts []ActionFunc) Action
LoadActions 加载Actions,依次执行ActionFunc操作函数
type ActionFunc ¶
type ActionFunc func(ops *Action)
ActionFunc KisFlow Functional Option 类型
func ActionJumpFunc ¶
func ActionJumpFunc(funcName string) ActionFunc
ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc (注意:容易出现Flow循环调用,导致死循环)
type Connector ¶
type Connector interface {
// Init 初始化Connector所关联的存储引擎链接等
Init() error
// Call 调用Connector 外挂存储逻辑的读写操作
Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
// GetId 获取Connector的ID
GetId() string
// GetName 获取Connector的名称
GetName() string
// GetConfig 获取Connector的配置信息
GetConfig() *config.KisConnConfig
// GetMetaData 得到当前Connector的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Connector的临时数据
SetMetaData(key string, value interface{})
}
type FaaS ¶
type FaaS interface{}
将 type FaaS func(context.Context, Flow) error 改为 type FaaS func(context.Context, Flow, ...interface{}) error 可以通过可变参数的任意输入类型进行数据传递
type FaaSDesc ¶
type FaaSDesc struct {
Serialize // 当前Function的数据输入输出序列化实现
FnName string // Function名称
ArgsType []reflect.Type // 函数参数类型(集合)
ArgNum int // 函数参数个数
FuncType reflect.Type // 函数类型
FuncValue reflect.Value // 函数值(函数地址)
// contains filtered or unexported fields
}
FaaSDesc FaaS 回调计算业务函数 描述
type Flow ¶
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接, 同时Flow的配置也会更新
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// AppendNewFunction 将一个新的Function追加到到Flow中
AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交)
// row: Must be a slice
CommitRowBatch(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
// GetCacheData 得到当前Flow的缓存数据
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData 得到当前Flow的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Flow的临时数据
SetMetaData(key string, value interface{})
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
GetFuncParamAll() config.FParam
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
GetFuncParamsAllFuncs() map[string]config.FParam
// Fork 得到Flow的一个副本(深拷贝)
Fork(ctx context.Context) Flow
// GetId 得到Flow的Id
GetId() string
}
type Function ¶
type Function interface {
// Call 执行流式计算逻辑
Call(ctx context.Context, flow Flow) error
// SetConfig 给当前Function实例配置策略
SetConfig(s *config.KisFuncConfig) error
// GetConfig 获取当前Function实例配置策略
GetConfig() *config.KisFuncConfig
// SetFlow 给当前Function实例设置所依赖的Flow实例
SetFlow(f Flow) error
// GetFlow 获取当前Functioin实力所依赖的Flow
GetFlow() Flow
// AddConnector 给当前Function实例添加一个Connector
AddConnector(conn Connector) error
// GetConnector 获取当前Function实例所关联的Connector
GetConnector() Connector
// CreateId 给当前Funciton实力生成一个随机的实例KisID
CreateId()
// GetId 获取当前Function的FID
GetId() string
// GetPrevId 获取当前Function上一个Function节点FID
GetPrevId() string
// GetNextId 获取当前Function下一个Function节点FID
GetNextId() string
// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
Next() Function
// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
Prev() Function
// SetN 设置下一层Function实例
SetN(f Function)
// SetP 设置上一层Function实例
SetP(f Function)
// GetMetaData 得到当前Function的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Function的临时数据
SetMetaData(key string, value interface{})
}
Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
任意个KisFunction可以组合成一个KisFlow
Click to show internal directories.
Click to hide internal directories.