Documentation
¶
Index ¶
- Constants
- Variables
- func CorrectsConfigKeysByJson(configs map[string]interface{}, jsonFilePath string) error
- func GetConfLoc() (string, error)
- func GetDataLoc() (string, error)
- func GetLoc(subdir string) (string, error)
- func GetLocalZone() int
- func GetLogLoc() (string, error)
- func GetNow() time.Time
- func GetNowInMilli() int64
- func GetPluginsLoc() (string, error)
- func GetTicker(duration int64) *clock.Ticker
- func GetTimer(duration int64) *clock.Timer
- func GetTimerByTime(t time.Time) *clock.Timer
- func InitClock()
- func InitConf()
- func LoadConfig(c interface{}) error
- func LoadConfigByName(name string, c interface{}) error
- func LoadConfigFromPath(p string, c interface{}) error
- func NewSqliteKVStore(table string) (*sqlKVStore, error)
- func Printable(m map[string]interface{}) map[string]interface{}
- func ProcessPath(p string) (string, error)
- func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
- func SetConsoleAndFileLog(consoleLog, fileLog bool) error
- func SetLogFormat(disableTimestamp bool)
- func SetLogLevel(level string, debug bool)
- func ValidateRuleOption(option *api.RuleOption) error
- type ConSelector
- type ConfKeysOperator
- type ConfigKeys
- func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) ClearConfKeys()
- func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
- func (c *ConfigKeys) DeleteConfKey(confKey string)
- func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
- func (c *ConfigKeys) GetConfKeys() (keys []string)
- func (c *ConfigKeys) GetPluginName() string
- func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
- func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
- func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
- type ConfigOperator
- func NewConfigOperatorForConnection(pluginName string) ConfigOperator
- func NewConfigOperatorForSink(pluginName string) ConfigOperator
- func NewConfigOperatorForSource(pluginName string) ConfigOperator
- func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
- type ConnectionConfigKeysOps
- type JsonPathEval
- type KuiperConf
- type PathConfigure
- type SQLConf
- type SinkConf
- type SinkConfigKeysOps
- type SourceConf
- type SourceConfigKeysOps
Constants ¶
const ( ConfFileName = "kuiper.yaml" DebugLogLevel = "debug" InfoLogLevel = "info" WarnLogLevel = "warn" ErrorLogLevel = "error" FatalLogLevel = "fatal" PanicLogLevel = "panic" )
const (
KuiperBaseKey = "KuiperBaseKey"
)
const Separator = "__"
Variables ¶
var ( Config *KuiperConf IsTesting bool TestId string )
var Clock clock.Clock
var CloseLogger = logger.CloseLogger
var FuncMap template.FuncMap
var (
Log = logger.Log
)
Functions ¶
func GetConfLoc ¶
func GetDataLoc ¶
func GetLocalZone ¶
func GetLocalZone() int
func GetNowInMilli ¶
func GetNowInMilli() int64
func GetPluginsLoc ¶
func LoadConfig ¶
func LoadConfig(c interface{}) error
func LoadConfigByName ¶
func LoadConfigFromPath ¶
func NewSqliteKVStore ¶
func ProcessPath ¶
func RedisStorageConSelectorApply ¶
func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
func SetConsoleAndFileLog ¶
func SetLogFormat ¶
func SetLogFormat(disableTimestamp bool)
func SetLogLevel ¶
func ValidateRuleOption ¶
func ValidateRuleOption(option *api.RuleOption) error
Types ¶
type ConSelector ¶
type ConSelector struct {
ConnSelectorStr string
Type string // mqtt edgex
CfgKey string // config key
}
func (*ConSelector) Init ¶
func (c *ConSelector) Init() error
func (*ConSelector) ReadCfgFromYaml ¶
func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error)
type ConfKeysOperator ¶
type ConfKeysOperator interface {
GetPluginName() string
GetConfContentByte() ([]byte, error)
// CopyConfContent get the configurations in etc and data folder
CopyConfContent() map[string]map[string]interface{}
// CopyReadOnlyConfContent get the configurations in etc folder
CopyReadOnlyConfContent() map[string]map[string]interface{}
// CopyUpdatableConfContent get the configurations in data folder
CopyUpdatableConfContent() map[string]map[string]interface{}
// CopyUpdatableConfContentFor get the configuration for the specific configKeys
CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
// LoadConfContent load the configurations into data configuration part
LoadConfContent(cf map[string]map[string]interface{})
GetConfKeys() (keys []string)
GetReadOnlyConfKeys() (keys []string)
GetUpdatableConfKeys() (keys []string)
DeleteConfKey(confKey string)
DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
AddConfKey(confKey string, reqField map[string]interface{}) error
AddConfKeyField(confKey string, reqField map[string]interface{}) error
ClearConfKeys()
}
ConfKeysOperator define interface to query/add/update/delete the configs in memory
type ConfigKeys ¶
type ConfigKeys struct {
// contains filtered or unexported fields
}
ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml Hold the connection configs for each connection type in etcCfg field Provide method to query/add/update/delete the configs
func (*ConfigKeys) AddConfKey ¶
func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) AddConfKeyField ¶
func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) ClearConfKeys ¶
func (c *ConfigKeys) ClearConfKeys()
func (*ConfigKeys) CopyConfContent ¶
func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyReadOnlyConfContent ¶
func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContent ¶
func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContentFor ¶
func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
func (*ConfigKeys) DeleteConfKey ¶
func (c *ConfigKeys) DeleteConfKey(confKey string)
func (*ConfigKeys) DeleteConfKeyField ¶
func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) GetConfContentByte ¶
func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
func (*ConfigKeys) GetConfKeys ¶
func (c *ConfigKeys) GetConfKeys() (keys []string)
func (*ConfigKeys) GetPluginName ¶
func (c *ConfigKeys) GetPluginName() string
func (*ConfigKeys) GetReadOnlyConfKeys ¶
func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
func (*ConfigKeys) GetUpdatableConfKeys ¶
func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
func (*ConfigKeys) LoadConfContent ¶
func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
type ConfigOperator ¶
type ConfigOperator interface {
ConfKeysOperator
SaveCfgToStorage() error
}
ConfigOperator define interface to query/add/update/delete the configs in disk
func NewConfigOperatorForConnection ¶
func NewConfigOperatorForConnection(pluginName string) ConfigOperator
NewConfigOperatorForConnection construct function
func NewConfigOperatorForSink ¶
func NewConfigOperatorForSink(pluginName string) ConfigOperator
NewConfigOperatorForSink construct function
func NewConfigOperatorForSource ¶
func NewConfigOperatorForSource(pluginName string) ConfigOperator
NewConfigOperatorForSource construct function
func NewConfigOperatorFromConnectionStorage ¶
func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromConnectionStorage construct function, Load the configs from et/connections/connection.yaml
func NewConfigOperatorFromSinkStorage ¶
func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSinkStorage construct function, Load the configs from etc/sources/xx.yaml
func NewConfigOperatorFromSourceStorage ¶
func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSourceStorage construct function, Load the configs from etc/sources/xx.yaml
type ConnectionConfigKeysOps ¶
type ConnectionConfigKeysOps struct {
*ConfigKeys
}
ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
func (*ConnectionConfigKeysOps) SaveCfgToStorage ¶
func (p *ConnectionConfigKeysOps) SaveCfgToStorage() error
type JsonPathEval ¶
type JsonPathEval interface {
Eval(data interface{}) (interface{}, error)
}
func GetJsonPathEval ¶
func GetJsonPathEval(jsonpath string) (JsonPathEval, error)
type KuiperConf ¶
type KuiperConf struct {
Basic struct {
LogLevel string `yaml:"logLevel"`
Debug bool `yaml:"debug"`
ConsoleLog bool `yaml:"consoleLog"`
FileLog bool `yaml:"fileLog"`
LogDisableTimestamp bool `yaml:"logDisableTimestamp"`
Syslog *syslogConf `yaml:"syslog"`
RotateTime int `yaml:"rotateTime"`
MaxAge int `yaml:"maxAge"`
RotateSize int64 `yaml:"rotateSize"`
RotateCount int `yaml:"rotateCount"`
TimeZone string `yaml:"timezone"`
Ip string `yaml:"ip"`
Port int `yaml:"port"`
RestIp string `yaml:"restIp"`
RestPort int `yaml:"restPort"`
RestTls *tlsConf `yaml:"restTls"`
Prometheus bool `yaml:"prometheus"`
PrometheusPort int `yaml:"prometheusPort"`
PluginHosts string `yaml:"pluginHosts"`
Authentication bool `yaml:"authentication"`
IgnoreCase bool `yaml:"ignoreCase"`
SQLConf *SQLConf `yaml:"sql"`
RulePatrolInterval string `yaml:"rulePatrolInterval"`
CfgStorageType string `yaml:"cfgStorageType"`
EnableOpenZiti bool `yaml:"enableOpenZiti"`
}
Rule api.RuleOption
Sink *SinkConf
Source *SourceConf
Store struct {
Type string `yaml:"type"`
ExtStateType string `yaml:"extStateType"`
Redis struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Password string `yaml:"password"`
Timeout int `yaml:"timeout"`
ConnectionSelector string `yaml:"connectionSelector"`
}
Sqlite struct {
Name string `yaml:"name"`
}
Fdb struct {
Path string `yaml:"path"`
}
}
Portable struct {
PythonBin string `yaml:"pythonBin"`
InitTimeout int `yaml:"initTimeout"`
}
}
type PathConfigure ¶
var ( PathConfig PathConfigure AbsoluteMapping = map[string]string{ // contains filtered or unexported fields } )
type SinkConf ¶
type SinkConf struct {
MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"`
BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"`
EnableCache bool `json:"enableCache" yaml:"enableCache"`
ResendInterval int `json:"resendInterval" yaml:"resendInterval"`
CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
ResendAlterQueue bool `json:"resendAlterQueue" yaml:"resendAlterQueue"`
ResendPriority int `json:"resendPriority" yaml:"resendPriority"`
ResendIndicatorField string `json:"resendIndicatorField" yaml:"resendIndicatorField"`
}
type SinkConfigKeysOps ¶
type SinkConfigKeysOps struct {
*ConfigKeys
}
SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
func (*SinkConfigKeysOps) SaveCfgToStorage ¶
func (c *SinkConfigKeysOps) SaveCfgToStorage() error
type SourceConf ¶
type SourceConf struct {
HttpServerIp string `json:"httpServerIp" yaml:"httpServerIp"`
HttpServerPort int `json:"httpServerPort" yaml:"httpServerPort"`
HttpServerTls *tlsConf `json:"httpServerTls" yaml:"httpServerTls"`
}
func (*SourceConf) Validate ¶
func (sc *SourceConf) Validate() error
type SourceConfigKeysOps ¶
type SourceConfigKeysOps struct {
*ConfigKeys
}
SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
func (*SourceConfigKeysOps) SaveCfgToStorage ¶
func (c *SourceConfigKeysOps) SaveCfgToStorage() error