Documentation
¶
Index ¶
- Constants
- Variables
- func AddMetrics(pipelineName string, msg *Msg)
- func HashConfig(config string) string
- type AfterMsgCommitFunc
- type AsynchronousOutput
- type DDLMsg
- type DMLMsg
- type DMLOp
- type Emitter
- type IFilter
- type IFilterFactory
- type IMatcher
- type IMatcherFactory
- type IMatcherGroup
- type Input
- type Metrics
- type Msg
- type MsgAcker
- type MsgSubmitter
- type MsgType
- type Output
- type Scheduler
- type SynchronousOutput
- type TaskReportStage
- type TaskReportStatus
Constants ¶
View Source
const (
PipelineTag = "pipeline"
)
Variables ¶
View Source
var ( MsgCreateToEmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_emit_duration", Help: "Bucketed histogram of processing time (s) from msg create to msg emit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgEmitToSubmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_emit_to_submit_duration", Help: "Bucketed histogram of processing time (s) from msg emit to submit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgSubmitToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_submit_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg submit to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgCreateToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg create to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) )
Functions ¶
func AddMetrics ¶
func HashConfig ¶
Types ¶
type AfterMsgCommitFunc ¶
type AsynchronousOutput ¶
type IFilterFactory ¶
type IFilterFactory interface {
NewFilter() IFilter
}
type IMatcherFactory ¶
type IMatcherFactory interface {
NewMatcher() IMatcher
}
type IMatcherGroup ¶
type IMatcherGroup []IMatcher
func (IMatcherGroup) Match ¶
func (matcherGroup IMatcherGroup) Match(msg *Msg) bool
Match returns true if all matcher returns true
type Input ¶
type Input interface {
Start(emitter Emitter) error
Close()
Stage() stages.InputStage
// TODO position store can be hidden by input plugin
// or we should use a configuration dedicated for position store
NewPositionStore() (position_store.PositionStore, error)
PositionStore() position_store.PositionStore
Done() chan position_store.Position
SendDeadSignal() error // for test only
Wait()
Identity() uint32
}
type Metrics ¶
type Metrics struct {
MsgCreateTime time.Time
MsgEmitTime time.Time
MsgSubmitTime time.Time
MsgAckTime time.Time
}
metrics definitions
type Msg ¶
type Msg struct {
Metrics
Type MsgType
Host string
Database string
Table string
DdlMsg *DDLMsg
DmlMsg *DMLMsg
//
// Timestamp, TimeZone, Oplog will be deprecated.
//
Timestamp time.Time
TimeZone *time.Location
Oplog *gtm.Op
InputStreamKey *string
OutputStreamKey *string
Done chan struct{}
InputSequence *int64
InputContext interface{}
AfterCommitCallback AfterMsgCommitFunc
}
type MsgSubmitter ¶
type Scheduler ¶
type Scheduler interface {
MsgSubmitter
MsgAcker
Healthy() bool
Start(output Output) error
Close()
}
type SynchronousOutput ¶
type TaskReportStage ¶
type TaskReportStage string
const ( ReportStageFull TaskReportStage = "Full" ReportStageIncremental TaskReportStage = "Incremental" )
type TaskReportStatus ¶
type TaskReportStatus struct {
Name string `json:"name"`
ConfigHash string `json:"configHash"`
Position string `json:"position"`
Stage TaskReportStage `json:"stage"`
Version string `json:"version"`
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.