Documentation
¶
Index ¶
- Constants
- Variables
- func GetLookupSource() api.Source
- func GetSink() api.Sink
- func GetSource() api.Source
- type SQLConf
- type SQLSinkConnector
- func (s *SQLSinkConnector) Close(ctx api.StreamContext) error
- func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
- func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
- func (s *SQLSinkConnector) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
- func (s *SQLSinkConnector) Consume(props map[string]any)
- func (s *SQLSinkConnector) Ping(ctx api.StreamContext, props map[string]any) error
- func (s *SQLSinkConnector) Provision(ctx api.StreamContext, configs map[string]any) error
- type SQLSourceConnector
- func (s *SQLSourceConnector) Close(ctx api.StreamContext) error
- func (s *SQLSourceConnector) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
- func (s *SQLSourceConnector) GetOffset() (interface{}, error)
- func (s *SQLSourceConnector) Ping(ctx api.StreamContext, m map[string]any) error
- func (s *SQLSourceConnector) Provision(ctx api.StreamContext, props map[string]any) error
- func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ...)
- func (s *SQLSourceConnector) ResetOffset(input map[string]interface{}) error
- func (s *SQLSourceConnector) Rewind(offset interface{}) error
- type SqlLookupSource
- func (s *SqlLookupSource) Close(ctx api.StreamContext) error
- func (s *SqlLookupSource) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
- func (s *SqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []string, values []any) ([]map[string]any, error)
- func (s *SqlLookupSource) Ping(ctx api.StreamContext, m map[string]any) error
- func (s *SqlLookupSource) Provision(ctx api.StreamContext, configs map[string]any) error
Constants ¶
View Source
const ( LblRecon = "recon" LblQuery = "query" LblScan = "scan" LblWait = "wait" LblPrepare = "prepare" LblScanInto = "scanInto" LblPull = "pull" LblReconn = "reconn" LblSql = "sql" LblException = "exception" LblReq = "req" )
View Source
const ( LblInsert = "insert" LblUpdate = "update" LblDel = "del" )
Variables ¶
View Source
var ( SqlSourceCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kuiper", Subsystem: "sql_source", Name: "counter", Help: "counter of SQL Source IO", }, []string{metrics.LblType, metrics.LblRuleIDType, metrics.LblOpIDType}) SqlSourceQueryDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "kuiper", Subsystem: "sql_source", Name: "query_duration_microseconds", Help: "SQL Source Historgram Duration of IO", Buckets: prometheus.ExponentialBuckets(10, 2, 22), }, []string{metrics.LblType, metrics.LblRuleIDType, metrics.LblOpIDType}) SqlSourceGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "kuiper", Subsystem: "sql_source", Name: "gauge", Help: "Gauge of SQL Source IO", }, []string{metrics.LblType, metrics.LblRuleIDType, metrics.LblOpIDType}) )
Functions ¶
func GetLookupSource ¶
Types ¶
type SQLConf ¶
type SQLConf struct {
Interval cast.DurationConf `json:"interval"`
DBUrl string `json:"dburl"`
URL string `json:"url,omitempty"`
Datasource string `json:"datasource"`
TemplateSqlQueryCfg *sqlgen.TemplateSqlQueryCfg `json:"templateSqlQueryCfg"`
}
type SQLSinkConnector ¶
type SQLSinkConnector struct {
// contains filtered or unexported fields
}
func (*SQLSinkConnector) Close ¶
func (s *SQLSinkConnector) Close(ctx api.StreamContext) error
func (*SQLSinkConnector) Collect ¶
func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
func (*SQLSinkConnector) CollectList ¶
func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
func (*SQLSinkConnector) Connect ¶
func (s *SQLSinkConnector) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
func (*SQLSinkConnector) Consume ¶ added in v2.1.1
func (s *SQLSinkConnector) Consume(props map[string]any)
Consume This is run after provision. Discard common confs that will only be handled in sink itself
func (*SQLSinkConnector) Ping ¶
func (s *SQLSinkConnector) Ping(ctx api.StreamContext, props map[string]any) error
func (*SQLSinkConnector) Provision ¶
func (s *SQLSinkConnector) Provision(ctx api.StreamContext, configs map[string]any) error
type SQLSourceConnector ¶
type SQLSourceConnector struct {
Query sqlgen.SqlQueryGenerator
// contains filtered or unexported fields
}
func (*SQLSourceConnector) Close ¶
func (s *SQLSourceConnector) Close(ctx api.StreamContext) error
func (*SQLSourceConnector) Connect ¶
func (s *SQLSourceConnector) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
func (*SQLSourceConnector) GetOffset ¶
func (s *SQLSourceConnector) GetOffset() (interface{}, error)
func (*SQLSourceConnector) Ping ¶
func (s *SQLSourceConnector) Ping(ctx api.StreamContext, m map[string]any) error
func (*SQLSourceConnector) Provision ¶
func (s *SQLSourceConnector) Provision(ctx api.StreamContext, props map[string]any) error
func (*SQLSourceConnector) Pull ¶
func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest)
func (*SQLSourceConnector) ResetOffset ¶
func (s *SQLSourceConnector) ResetOffset(input map[string]interface{}) error
func (*SQLSourceConnector) Rewind ¶
func (s *SQLSourceConnector) Rewind(offset interface{}) error
type SqlLookupSource ¶
type SqlLookupSource struct {
// contains filtered or unexported fields
}
func (*SqlLookupSource) Close ¶
func (s *SqlLookupSource) Close(ctx api.StreamContext) error
func (*SqlLookupSource) Connect ¶
func (s *SqlLookupSource) Connect(ctx api.StreamContext, sc api.StatusChangeHandler) error
func (*SqlLookupSource) Lookup ¶
func (s *SqlLookupSource) Lookup(ctx api.StreamContext, fields []string, keys []string, values []any) ([]map[string]any, error)
func (*SqlLookupSource) Ping ¶
func (s *SqlLookupSource) Ping(ctx api.StreamContext, m map[string]any) error
func (*SqlLookupSource) Provision ¶
func (s *SqlLookupSource) Provision(ctx api.StreamContext, configs map[string]any) error
Click to show internal directories.
Click to hide internal directories.