Documentation
¶
Index ¶
- Constants
- Variables
- func AddMetrics(pipelineName string, msg *Msg)
- func HashConfig(config string) string
- func SafeEncodeString(s string) string
- type AfterMsgCommitFunc
- type AsynchronousOutput
- type DDLMsg
- type DMLMsg
- type DMLOp
- type Emitter
- type EmptyRouter
- type IFilter
- type IFilterFactory
- type IMatcher
- type IMatcherFactory
- type IMatcherGroup
- type Input
- type Metrics
- type Msg
- type MsgAcker
- type MsgSubmitter
- type MsgType
- type Output
- type PositionCacheCreator
- type Router
- type Scheduler
- type SynchronousOutput
- type TaskReportStage
- type TaskReportStatus
Constants ¶
View Source
const (
PipelineTag = "pipeline"
)
Variables ¶
View Source
var ( MsgCreateToEmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_emit_duration", Help: "Bucketed histogram of processing time (s) from msg create to msg emit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgEmitToSubmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_emit_to_submit_duration", Help: "Bucketed histogram of processing time (s) from msg emit to submit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgSubmitToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_submit_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg submit to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgCreateToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg create to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) )
Functions ¶
func AddMetrics ¶
func HashConfig ¶
func SafeEncodeString ¶ added in v0.9.1
Types ¶
type AfterMsgCommitFunc ¶
type AsynchronousOutput ¶
type EmptyRouter ¶ added in v0.9.17
type EmptyRouter struct{}
func (EmptyRouter) Exists ¶ added in v0.9.17
func (EmptyRouter) Exists(msg *Msg) bool
type IFilterFactory ¶
type IFilterFactory interface {
NewFilter() IFilter
}
type IMatcherFactory ¶
type IMatcherFactory interface {
NewMatcher() IMatcher
}
type IMatcherGroup ¶
type IMatcherGroup []IMatcher
func (IMatcherGroup) Match ¶
func (matcherGroup IMatcherGroup) Match(msg *Msg) bool
Match returns true if all matcher returns true
type Input ¶
type Input interface {
Start(emitter Emitter, router Router, positionCache position_store.PositionCacheInterface) error
Close()
Stage() config.InputMode
Done() chan position_store.Position
SendDeadSignal() error // for test only
Wait()
}
type Metrics ¶
type Metrics struct {
MsgCreateTime time.Time
MsgEmitTime time.Time
MsgSubmitTime time.Time
MsgAckTime time.Time
}
metrics definitions
type Msg ¶
type Msg struct {
Metrics
Type MsgType
Host string
Database string
Table string
DdlMsg *DDLMsg
DmlMsg *DMLMsg
//
// Timestamp, TimeZone, Oplog will be deprecated.
//
Timestamp time.Time
TimeZone *time.Location
Oplog *gtm.Op
InputStreamKey *string
OutputStreamKey *string
Done chan struct{}
InputSequence *int64
InputContext interface{}
AfterCommitCallback AfterMsgCommitFunc
}
type MsgSubmitter ¶
type PositionCacheCreator ¶ added in v0.9.17
type PositionCacheCreator interface {
NewPositionCache() (position_store.PositionCacheInterface, error)
}
type Scheduler ¶
type Scheduler interface {
MsgSubmitter
MsgAcker
Healthy() bool
Start(output Output) error
Close()
}
type SynchronousOutput ¶
type TaskReportStage ¶
type TaskReportStage string
const ( ReportStageFull TaskReportStage = "Full" ReportStageIncremental TaskReportStage = "Incremental" )
type TaskReportStatus ¶
type TaskReportStatus struct {
Name string `json:"name"`
ConfigHash string `json:"configHash"`
Position string `json:"position"`
Stage TaskReportStage `json:"stage"`
Version string `json:"version"`
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.