Documentation
¶
Index ¶
- Constants
- Variables
- func ClearKVStorage() error
- func CorrectsConfigKeysByJson(configs map[string]interface{}, jsonFilePath string) error
- func DropCfgKeyFromStorage(typ string, plugin string, confKey string) error
- func GetAllConnConfigs() (map[string]map[string]map[string]any, error)
- func GetCfgFromKVStorage(typ string, plugin string, confKey string) (map[string]map[string]interface{}, error)
- func GetConfLoc() (s string, err error)
- func GetDataLoc() (s string, err error)
- func GetEnv() map[string]string
- func GetLoc(subdir string) (string, error)
- func GetLogLoc() (string, error)
- func GetMetricsLoc() (string, error)
- func GetPluginsLoc() (s string, err error)
- func GetYamlConfigAllKeys(typ string) (map[string]struct{}, error)
- func InitConf()
- func InitMetricsFolder() error
- func LoadCfgKeyKV(key string) (map[string]interface{}, error)
- func LoadConfig(c interface{}) error
- func LoadConfigByName(name string, c interface{}) error
- func LoadConfigFromPath(p string, c interface{}) error
- func NewSqliteKVStore(table string) (*sqlKVStore, error)
- func Printable(m map[string]interface{}) map[string]interface{}
- func SaveCfgKeyToKV(key string, cfg map[string]interface{}) error
- func SetConsoleAndFileLog(consoleLog, fileLog bool) error
- func SetLogFormat(disableTimestamp bool)
- func SetLogLevel(level string, debug bool)
- func SetupEnv()
- func ValidateRuleOption(option *def.RuleOption) error
- func WriteCfgIntoKVStorage(typ string, plugin string, confKey string, confData map[string]interface{}) error
- type ConfKeysOperator
- type ConfigKeys
- func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) ClearConfKeys()
- func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
- func (c *ConfigKeys) DeleteConfKey(confKey string)
- func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
- func (c *ConfigKeys) GetConfKeys() (keys []string)
- func (c *ConfigKeys) GetPluginName() string
- func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
- func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
- func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
- type ConfigOperator
- func NewConfigOperatorForConnection(pluginName string) ConfigOperator
- func NewConfigOperatorForSink(pluginName string) ConfigOperator
- func NewConfigOperatorForSource(pluginName string) ConfigOperator
- func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
- type ConnectionConfigKeysOps
- type EnvManager
- type JsonPathEval
- type PathConfigure
- type SinkConfigKeysOps
- type SourceConfigKeysOps
Constants ¶
const ( ConfFileName = "kuiper.yaml" DebugLogLevel = "debug" InfoLogLevel = "info" WarnLogLevel = "warn" ErrorLogLevel = "error" FatalLogLevel = "fatal" PanicLogLevel = "panic" )
const (
KuiperBaseKey = "KuiperBaseKey"
)
const Separator = "__"
Variables ¶
var ( Config *model.KuiperConf IsTesting bool TestId string )
var CloseLogger = logger.CloseLogger
var FuncMap template.FuncMap
var LoadConfigCache map[string]map[string]interface{}
var (
Log = logger.Log
)
Functions ¶
func DropCfgKeyFromStorage ¶
DropCfgKeyFromStorage ...
func GetAllConnConfigs ¶
GetAllConnConfigs return connections' plugin -> confKey -> props
func GetCfgFromKVStorage ¶
func GetCfgFromKVStorage(typ string, plugin string, confKey string) (map[string]map[string]interface{}, error)
GetCfgFromKVStorage ...
func GetConfLoc ¶
func GetDataLoc ¶
func GetMetricsLoc ¶ added in v2.1.0
func GetPluginsLoc ¶
func GetYamlConfigAllKeys ¶
GetYamlConfigAllKeys get all plugin keys about sources/sinks/connections
func InitMetricsFolder ¶ added in v2.1.0
func InitMetricsFolder() error
func LoadCfgKeyKV ¶
func LoadConfig ¶
func LoadConfig(c interface{}) error
func LoadConfigByName ¶
func LoadConfigFromPath ¶
func NewSqliteKVStore ¶
func SaveCfgKeyToKV ¶
SaveCfgKeyToKV ...
func SetConsoleAndFileLog ¶
func SetLogFormat ¶
func SetLogFormat(disableTimestamp bool)
func SetLogLevel ¶
func ValidateRuleOption ¶
func ValidateRuleOption(option *def.RuleOption) error
Types ¶
type ConfKeysOperator ¶
type ConfKeysOperator interface {
GetPluginName() string
GetConfContentByte() ([]byte, error)
// CopyConfContent get the configurations in etc and data folder
CopyConfContent() map[string]map[string]interface{}
// CopyReadOnlyConfContent get the configurations in etc folder
CopyReadOnlyConfContent() map[string]map[string]interface{}
// CopyUpdatableConfContent get the configurations in data folder
CopyUpdatableConfContent() map[string]map[string]interface{}
// CopyUpdatableConfContentFor get the configuration for the specific configKeys
CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
// LoadConfContent load the configurations into data configuration part
LoadConfContent(cf map[string]map[string]interface{})
GetConfKeys() (keys []string)
GetReadOnlyConfKeys() (keys []string)
GetUpdatableConfKeys() (keys []string)
DeleteConfKey(confKey string)
DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
AddConfKey(confKey string, reqField map[string]interface{}) error
AddConfKeyField(confKey string, reqField map[string]interface{}) error
ClearConfKeys()
}
ConfKeysOperator define interface to query/add/update/delete the configs in memory
type ConfigKeys ¶
type ConfigKeys struct {
// contains filtered or unexported fields
}
ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml Hold the connection configs for each connection type in etcCfg field Provide method to query/add/update/delete the configs
func (*ConfigKeys) AddConfKey ¶
func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) AddConfKeyField ¶
func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) ClearConfKeys ¶
func (c *ConfigKeys) ClearConfKeys()
func (*ConfigKeys) CopyConfContent ¶
func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyReadOnlyConfContent ¶
func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContent ¶
func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContentFor ¶
func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
func (*ConfigKeys) DeleteConfKey ¶
func (c *ConfigKeys) DeleteConfKey(confKey string)
func (*ConfigKeys) DeleteConfKeyField ¶
func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) GetConfContentByte ¶
func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
func (*ConfigKeys) GetConfKeys ¶
func (c *ConfigKeys) GetConfKeys() (keys []string)
func (*ConfigKeys) GetPluginName ¶
func (c *ConfigKeys) GetPluginName() string
func (*ConfigKeys) GetReadOnlyConfKeys ¶
func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
func (*ConfigKeys) GetUpdatableConfKeys ¶
func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
func (*ConfigKeys) LoadConfContent ¶
func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
type ConfigOperator ¶
type ConfigOperator interface {
ConfKeysOperator
SaveCfgToStorage() error
}
ConfigOperator define interface to query/add/update/delete the configs in disk
func NewConfigOperatorForConnection ¶
func NewConfigOperatorForConnection(pluginName string) ConfigOperator
NewConfigOperatorForConnection construct function
func NewConfigOperatorForSink ¶
func NewConfigOperatorForSink(pluginName string) ConfigOperator
NewConfigOperatorForSink construct function
func NewConfigOperatorForSource ¶
func NewConfigOperatorForSource(pluginName string) ConfigOperator
NewConfigOperatorForSource construct function
func NewConfigOperatorFromConnectionStorage ¶
func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromConnectionStorage construct function, Load the configs from et/connections/connection.yaml
func NewConfigOperatorFromSinkStorage ¶
func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSinkStorage construct function, Load the configs from etc/sources/xx.yaml
func NewConfigOperatorFromSourceStorage ¶
func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSourceStorage construct function, Load the configs from etc/sources/xx.yaml
type ConnectionConfigKeysOps ¶
type ConnectionConfigKeysOps struct {
*ConfigKeys
}
ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
func (*ConnectionConfigKeysOps) SaveCfgToStorage ¶
func (p *ConnectionConfigKeysOps) SaveCfgToStorage() error
type EnvManager ¶
type EnvManager struct {
// contains filtered or unexported fields
}
func (*EnvManager) GetEnv ¶
func (e *EnvManager) GetEnv() map[string]string
func (*EnvManager) Setup ¶
func (e *EnvManager) Setup()
type JsonPathEval ¶
type JsonPathEval interface {
Eval(data interface{}) (interface{}, error)
}
func GetJsonPathEval ¶
func GetJsonPathEval(jsonpath string) (JsonPathEval, error)
type PathConfigure ¶
var ( PathConfig PathConfigure AbsoluteMapping = map[string]string{ // contains filtered or unexported fields } )
type SinkConfigKeysOps ¶
type SinkConfigKeysOps struct {
*ConfigKeys
}
SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
func (*SinkConfigKeysOps) SaveCfgToStorage ¶
func (c *SinkConfigKeysOps) SaveCfgToStorage() error
type SourceConfigKeysOps ¶
type SourceConfigKeysOps struct {
*ConfigKeys
}
SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
func (*SourceConfigKeysOps) SaveCfgToStorage ¶
func (c *SourceConfigKeysOps) SaveCfgToStorage() error