kis

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ActionAbort

func ActionAbort(action *Action)

ActionAbort 终止Flow的执行

func ActionDataReuse

func ActionDataReuse(act *Action)

ActionDataReuse Next复用上层Function数据Option

func ActionForceEntryNext

func ActionForceEntryNext(act *Action)

ActionForceEntryNext 强制进入下一层

func Pool

func Pool() *kisPool

Pool 单例构造

Types

type Action

type Action struct {
	// DataReuse 是否复用上层Function数据
	DataReuse bool

	// 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
	// ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
	ForceEntryNext bool

	// JumpFunc 跳转到指定Function继续执行
	JumpFunc string

	// Abort 终止Flow的执行
	Abort bool
}

Action KisFlow执行流程Actions

func LoadActions

func LoadActions(acts []ActionFunc) Action

LoadActions 加载Actions,依次执行ActionFunc操作函数

type ActionFunc

type ActionFunc func(ops *Action)

ActionFunc KisFlow Functional Option 类型

func ActionJumpFunc

func ActionJumpFunc(funcName string) ActionFunc

ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc (注意:容易出现Flow循环调用,导致死循环)

type CaaS

type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)
Connector Call

CaaS Connector的存储读取业务实现

type ConnInit

type ConnInit func(conn Connector) error
Connector Init

ConnInit Connector 第三方挂载存储初始化

type Connector

type Connector interface {
	// Init 初始化Connector所关联的存储引擎链接等
	Init() error
	// Call 调用Connector 外挂存储逻辑的读写操作
	Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
	// GetId 获取Connector的ID
	GetId() string
	// GetName 获取Connector的名称
	GetName() string
	// GetConfig 获取Connector的配置信息
	GetConfig() *config.KisConnConfig
	// GetMetaData 得到当前Connector的临时数据
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Connector的临时数据
	SetMetaData(key string, value interface{})
}

type FaaS

type FaaS interface{}

将 type FaaS func(context.Context, Flow) error 改为 type FaaS func(context.Context, Flow, ...interface{}) error 可以通过可变参数的任意输入类型进行数据传递

type FaaSDesc

type FaaSDesc struct {
	Serialize        // 当前Function的数据输入输出序列化实现
	FnName    string // Function名称

	ArgsType  []reflect.Type // 函数参数类型(集合)
	ArgNum    int            // 函数参数个数
	FuncType  reflect.Type   // 函数类型
	FuncValue reflect.Value  // 函数值(函数地址)
	// contains filtered or unexported fields
}

FaaSDesc FaaS 回调计算业务函数 描述

func NewFaaSDesc

func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error)

NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例

type Flow

type Flow interface {
	// Run 调度Flow,依次调度Flow中的Function并且执行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照配置文件中的配置进行连接, 同时Flow的配置也会更新
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// AppendNewFunction 将一个新的Function追加到到Flow中
	AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将执行的Function层
	CommitRow(row interface{}) error
	// CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交)
	// row: Must be a slice
	CommitRowBatch(row interface{}) error
	// Input 得到flow当前执行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的名称
	GetName() string
	// GetThisFunction 得到当前正在执行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当前正在执行的Function的配置
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当前正在执行的Function的Connector
	GetConnector() (Connector, error)
	// GetConnConf 得到当前正在执行的Function的Connector的配置
	GetConnConf() (*config.KisConnConfig, error)
	// GetConfig 得到当前Flow的配置
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName 得到当前Flow的配置
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
	// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
	Next(acts ...ActionFunc) error
	// GetCacheData 得到当前Flow的缓存数据
	GetCacheData(key string) interface{}
	// SetCacheData 设置当前Flow的缓存数据
	SetCacheData(key string, value interface{}, Exp time.Duration)
	// GetMetaData 得到当前Flow的临时数据
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Flow的临时数据
	SetMetaData(key string, value interface{})
	// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
	GetFuncParam(key string) string
	// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
	GetFuncParamAll() config.FParam
	// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
	GetFuncParamsAllFuncs() map[string]config.FParam
	// Fork 得到Flow的一个副本(深拷贝)
	Fork(ctx context.Context) Flow
	// GetId 得到Flow的Id
	GetId() string
}

type Function

type Function interface {
	// Call 执行流式计算逻辑
	Call(ctx context.Context, flow Flow) error

	// SetConfig 给当前Function实例配置策略
	SetConfig(s *config.KisFuncConfig) error
	// GetConfig 获取当前Function实例配置策略
	GetConfig() *config.KisFuncConfig

	// SetFlow 给当前Function实例设置所依赖的Flow实例
	SetFlow(f Flow) error
	// GetFlow 获取当前Functioin实力所依赖的Flow
	GetFlow() Flow

	// AddConnector 给当前Function实例添加一个Connector
	AddConnector(conn Connector) error
	// GetConnector 获取当前Function实例所关联的Connector
	GetConnector() Connector

	// CreateId 给当前Funciton实力生成一个随机的实例KisID
	CreateId()
	// GetId 获取当前Function的FID
	GetId() string
	// GetPrevId 获取当前Function上一个Function节点FID
	GetPrevId() string
	// GetNextId 获取当前Function下一个Function节点FID
	GetNextId() string

	// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
	Next() Function
	// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
	Prev() Function
	// SetN 设置下一层Function实例
	SetN(f Function)
	// SetP 设置上一层Function实例
	SetP(f Function)
	// GetMetaData 得到当前Function的临时数据
	GetMetaData(key string) interface{}
	// SetMetaData 设置当前Function的临时数据
	SetMetaData(key string, value interface{})
}

Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,

任意个KisFunction可以组合成一个KisFlow

type Serialize

type Serialize interface {
	// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
	UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
	// Marshal 用于将指定类型的值序列化为 KisRowArr。
	Marshal(interface{}) (common.KisRowArr, error)
}

Serialize 数据序列化接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL