Documentation
¶
Index ¶
- Constants
- Variables
- func AesDecryptECB(encrypted string) string
- func AesEncryptECB(origData string) string
- func CompareClickHouseVersion(v1, v2 string) int
- func EnvBoolVar(value *bool, key string)
- func EnvIntVar(value *int, key string)
- func EnvStringVar(value *string, key string)
- func GenTraceId() string
- func GetIP4Byname(host string) (ips []string, err error)
- func GetNetAddrPort(addr net.Addr) (port int)
- func GetOutboundIP() (ip net.IP, err error)
- func GetShift(s int) (shift uint)
- func GetSourceName(parser, name string) (sourcename string)
- func GetSpareTCPPort(portBegin int) int
- func InitLogger(newLogPaths []string)
- func JksToPem(jksPath, jksPassword string, overwrite bool) (certPemPath, keyPemPath string, err error)
- func LogTrace(traceId string, kind string, fields ...zapcore.Field)
- func NewTLSConfig(caCertFiles, clientCertFile, clientKeyFile string, insecureSkipVerify bool) (*tls.Config, error)
- func Run(appName string, initFunc, jobFunc, cleanupFunc func() error)
- func SetLogLevel(newLogLevel string)
- func SetLogTrace(enabled bool)
- func StringContains(arr []string, str string) bool
- func TrySetValue(v1, v2 interface{}) bool
- type CmdOptions
- type Credentials
- type Gosypt
- type RecordSize
- type WorkerPool
Constants ¶
const ( TraceKindFetchStart string = "fetch start" TraceKindFetchEnd string = "fetch end" TraceKindProcessStart string = "process start" TraceKindProcessEnd string = "process end" TraceKindWriteStart string = "loopwrite start" TraceKindWriteEnd string = "loopwrite end" TraceKindProcessing string = "process continue" )
const ( GosyptPrefixDefault = "ENC(" GosyptSuffxiDefault = ")" GosyptAlgorithm = "AESWITHHEXANDBASE64" )
const ( HttpPortBase = 10000 MaxPort = 65535 )
const ( StateRunning uint32 = 0 StateStopped uint32 = 1 )
Variables ¶
var ( // ErrStopped when stopped ErrStopped = errors.New("WorkerPool already stopped") )
var Gsypt = &Gosypt{ prefix: GosyptPrefixDefault, suffix: GosyptSuffxiDefault, algorithm: GosyptAlgorithm, }
var (
Logger *zap.Logger
)
Functions ¶
func AesDecryptECB ¶ added in v0.5.0
select aes_decrypt(unhex("E310E892E56801CED9ED98AA177F18E6"), unhex("656f6974656b")); => 123456
func AesEncryptECB ¶ added in v0.5.0
select hex(aes_encrypt("123456", unhex("656f6974656b"))); => E310E892E56801CED9ED98AA177F18E6
func CompareClickHouseVersion ¶ added in v0.5.0
func EnvBoolVar ¶
func EnvStringVar ¶
func GenTraceId ¶ added in v0.5.0
func GenTraceId() string
func GetIP4Byname ¶
func GetNetAddrPort ¶
func GetOutboundIP ¶
GetOutboundIP gets preferred outbound ip of this machine https://stackoverflow.com/questions/23558425/how-do-i-get-the-local-ip-address-in-go.
func GetSourceName ¶
GetSourceName returns the field name in message for the given ClickHouse column
func GetSpareTCPPort ¶
GetSpareTCPPort finds a spare TCP port.
func InitLogger ¶
func InitLogger(newLogPaths []string)
func JksToPem ¶
func JksToPem(jksPath, jksPassword string, overwrite bool) (certPemPath, keyPemPath string, err error)
JksToPem converts JKS to PEM Refers to: https://serverfault.com/questions/715827/how-to-generate-key-and-crt-file-from-jks-file-for-httpd-apache-server
func NewTLSConfig ¶
func NewTLSConfig(caCertFiles, clientCertFile, clientKeyFile string, insecureSkipVerify bool) (*tls.Config, error)
Refers to: https://medium.com/processone/using-tls-authentication-for-your-go-kafka-client-3c5841f2a625 https://github.com/denji/golang-tls https://www.baeldung.com/java-keystore-truststore-difference
func SetLogLevel ¶
func SetLogLevel(newLogLevel string)
func SetLogTrace ¶ added in v0.5.0
func SetLogTrace(enabled bool)
func StringContains ¶
StringContains check if contains string in array
func TrySetValue ¶ added in v0.5.0
func TrySetValue(v1, v2 interface{}) bool
set v2 to v1, if v1 didn't bind any value FIXME: how about v1 bind default value?
Types ¶
type CmdOptions ¶ added in v0.5.0
type CmdOptions struct {
ShowVer bool
LogLevel string // "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
LogPaths string // comma-separated paths. "stdout" means the console stdout
// HTTPHost to bind to. If empty, outbound ip of machine
// is automatically determined and used.
HTTPHost string
HTTPPort int // 0 means a randomly chosen port.
PushGatewayAddrs string
PushInterval int
LocalCfgFile string
NacosAddr string
NacosNamespaceID string
NacosGroup string
NacosUsername string
NacosPassword string
NacosDataID string
NacosServiceName string // participate in assignment management if not empty
Encrypt string
Credentials
}
type Credentials ¶ added in v0.5.0
type Gosypt ¶ added in v0.5.0
type Gosypt struct {
// contains filtered or unexported fields
}
Golang Simple Encrypt, simulate jasypt
func (*Gosypt) SetAttribution ¶ added in v0.5.0
type RecordSize ¶ added in v0.5.0
type RecordSize struct {
// contains filtered or unexported fields
}
var Rs RecordSize
func (*RecordSize) Allow ¶ added in v0.5.0
func (rs *RecordSize) Allow() bool
func (*RecordSize) Dec ¶ added in v0.5.0
func (rs *RecordSize) Dec(size int64)
func (*RecordSize) Get ¶ added in v0.5.0
func (rs *RecordSize) Get() int64
func (*RecordSize) Inc ¶ added in v0.5.0
func (rs *RecordSize) Inc(size int64)
func (*RecordSize) Reset ¶ added in v0.5.0
func (rs *RecordSize) Reset()
func (*RecordSize) SetPoolSize ¶ added in v0.5.0
func (rs *RecordSize) SetPoolSize(size int64)
type WorkerPool ¶
WorkerPool is a blocked worker pool inspired by https://github.com/gammazero/workerpool/
func NewWorkerPool ¶
func NewWorkerPool(maxWorkers int, queueSize int) *WorkerPool
New creates and starts a pool of worker goroutines.
func (*WorkerPool) Resize ¶
func (w *WorkerPool) Resize(maxWorkers int)
Resize ensures worker number match the expected one.
func (*WorkerPool) Restart ¶
func (w *WorkerPool) Restart()
func (*WorkerPool) StopWait ¶
func (w *WorkerPool) StopWait()
StopWait stops the worker pool and waits for all queued tasks tasks to complete.
func (*WorkerPool) Submit ¶
func (w *WorkerPool) Submit(fn func()) (err error)
Submit enqueues a function for a worker to execute. Submit will block regardless if there is no free workers.