Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func ContextWithStatement(parent context.Context, s *StatementInfo) context.Context
 - func DebugMode(debug bool) tracerProviderOption
 - func DefaultContext() context.Context
 - func DefaultSpanContext() *trace.SpanContext
 - func DisableLogErrorReport(disable bool)
 - func EnableTracer(enable bool) tracerProviderOption
 - func GetLongQueryTime() time.Duration
 - func GetNodeResource() *trace.MONodeResource
 - func GetSchemaForAccount(ctx context.Context, account string) []string
 - func Init(ctx context.Context, opts ...TracerProviderOption) error
 - func InitSchema(ctx context.Context, sqlExecutor func() ie.InternalExecutor) error
 - func InitSchemaByInnerExecutor(ctx context.Context, ieFactory func() ie.InternalExecutor) error
 - func InitWithConfig(ctx context.Context, SV *config.ObservabilityParameters, ...) error
 - func NewBatchSpanProcessor(exporter BatchProcessor) trace.SpanProcessor
 - func NewBufferPipe2CSVWorker(opt ...BufferOption) bp.PipeImpl[bp.HasName, any]
 - func NewItemBuffer(opts ...BufferOption) *itemBuffer
 - func ReportError(ctx context.Context, err error, depth int)
 - func ReportZap(jsonEncoder zapcore.Encoder, entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error)
 - func SetDefaultContext(ctx context.Context)
 - func SetDefaultSpanContext(sc *trace.SpanContext)
 - func SetTracerProvider(p *MOTracerProvider)
 - func Shutdown(ctx context.Context) error
 - func StatementInfoFilter(i Item) bool
 - func StatementInfoUpdate(existing, new Item)
 - func Time2DatetimeString(t time.Time) string
 - func WithAggregatorDisable(disable bool) tracerProviderOption
 - func WithAggregatorWindow(window time.Duration) tracerProviderOption
 - func WithBatchProcessMode(mode string) tracerProviderOption
 - func WithBatchProcessor(p BatchProcessor) tracerProviderOption
 - func WithBufferSizeThreshold(size int64) tracerProviderOption
 - func WithExportInterval(secs int) tracerProviderOption
 - func WithFSWriterFactory(f table.WriterFactory) tracerProviderOption
 - func WithInitAction(init bool) tracerProviderOption
 - func WithLongQueryTime(secs float64) tracerProviderOption
 - func WithLongSpanTime(d time.Duration) tracerProviderOption
 - func WithNode(uuid string, t string) tracerProviderOption
 - func WithSQLExecutor(f func() ie.InternalExecutor) tracerProviderOption
 - func WithSQLWriterDisable(disable bool) tracerProviderOption
 - func WithSelectThreshold(window time.Duration) tracerProviderOption
 - func WithSkipRunningStmt(skip bool) tracerProviderOption
 - func WithSpanDisable(disable bool) tracerProviderOption
 - func WithStmtMergeEnable(enable bool) tracerProviderOption
 - type Aggregator
 - type BatchProcessor
 - type BufferOption
 - type IBuffer2SqlItem
 - type Item
 - type ItemSyncer
 - func (s *ItemSyncer) FillRow(ctx context.Context, row *table.Row)
 - func (s *ItemSyncer) Free()
 - func (s *ItemSyncer) GetCheckWriteHook() table.CheckWriteHook
 - func (s *ItemSyncer) GetName() string
 - func (s *ItemSyncer) GetTable() *table.Table
 - func (s *ItemSyncer) NeedCheckWrite() bool
 - func (s *ItemSyncer) NeedSyncWrite() bool
 - func (s *ItemSyncer) Size() int64
 - func (s *ItemSyncer) Wait()
 
- type Key
 - type MOErrorHolder
 - type MOSpan
 - func (s *MOSpan) AddExtraFields(fields ...zap.Field)
 - func (s *MOSpan) End(options ...trace.SpanEndOption)
 - func (s *MOSpan) FillRow(ctx context.Context, row *table.Row)
 - func (s *MOSpan) Free()
 - func (s *MOSpan) GetName() string
 - func (s *MOSpan) GetTable() *table.Table
 - func (s *MOSpan) ParentSpanContext() trace.SpanContext
 - func (s *MOSpan) Size() int64
 - func (s *MOSpan) SpanContext() trace.SpanContext
 
- type MOTracer
 - type MOTracerProvider
 - type MOZapLog
 - type NamedItemRow
 - type NoopBatchProcessor
 - type PipeImpl
 - type SerializableExecPlan
 - type SerializeExecPlanFunc
 - type StatementInfo
 - func (s *StatementInfo) ExecPlan2Json(ctx context.Context) []byte
 - func (s *StatementInfo) ExecPlan2Stats(ctx context.Context) []byte
 - func (s *StatementInfo) FillRow(ctx context.Context, row *table.Row)
 - func (s *StatementInfo) Free()
 - func (s *StatementInfo) GetName() string
 - func (s *StatementInfo) GetTable() *table.Table
 - func (s *StatementInfo) IsZeroTxnID() bool
 - func (s *StatementInfo) Key(duration time.Duration) interface{}
 - func (s *StatementInfo) Report(ctx context.Context)
 - func (s *StatementInfo) SetSerializableExecPlan(execPlan SerializableExecPlan)
 - func (s *StatementInfo) SetTxnID(id []byte)
 - func (s *StatementInfo) Size() int64
 
- type StatementInfoStatus
 - type StatementOption
 - type StatementOptionFunc
 - type Statistic
 - type TracerProviderOption
 - type WriteFactoryConfig
 
Constants ¶
const ( InternalExecutor = "InternalExecutor" FileService = "FileService" )
const ( MOStatementType = "statement" MOSpanType = "span" MOLogType = "log" MOErrorType = "error" MORawLogType = "rawlog" )
const ( SystemDBConst = "system" StatsDatabase = SystemDBConst )
const (
	RawLogTbl = "rawlog"
)
    Variables ¶
var ( SingleStatementTable = &table.Table{ Account: table.AccountSys, Database: StatsDatabase, Table: statementInfoTbl, Columns: []table.Column{ stmtIDCol, txnIDCol, sesIDCol, accountCol, userCol, hostCol, dbCol, stmtCol, stmtTagCol, stmtFgCol, nodeUUIDCol, nodeTypeCol, reqAtCol, respAtCol, durationCol, statusCol, errCodeCol, errorCol, execPlanCol, rowsReadCol, bytesScanCol, statsCol, stmtTypeCol, queryTypeCol, roleIdCol, sqlTypeCol, aggrCntCol, resultCntCol, }, PrimaryKeyColumn: nil, ClusterBy: []table.Column{reqAtCol, accountCol}, Engine: table.NormalTableEngine, Comment: "record each statement and stats info", PathBuilder: table.NewAccountDatePathBuilder(), AccountColumn: &accountCol, SupportUserAccess: true, SupportConstAccess: true, } SingleRowLogTable = &table.Table{ Account: table.AccountSys, Database: StatsDatabase, Table: RawLogTbl, Columns: []table.Column{ rawItemCol, nodeUUIDCol, nodeTypeCol, spanIDCol, traceIDCol, loggerNameCol, timestampCol, levelCol, callerCol, messageCol, extraCol, errCodeCol, errorCol, stackCol, spanNameCol, parentSpanIDCol, startTimeCol, endTimeCol, durationCol, resourceCol, spanKindCol, }, PrimaryKeyColumn: nil, ClusterBy: []table.Column{timestampCol, rawItemCol}, Engine: table.NormalTableEngine, Comment: "read merge data from log, error, span", PathBuilder: table.NewAccountDatePathBuilder(), AccountColumn: nil, SupportUserAccess: false, SupportConstAccess: true, } )
var EndStatement = func(ctx context.Context, err error, sentRows int64) { if !GetTracerProvider().IsEnable() { return } s := StatementFromContext(ctx) if s == nil { panic(moerr.NewInternalError(ctx, "no statement info in context")) } s.mux.Lock() defer s.mux.Unlock() if !s.end { s.end = true s.ResultCount = sentRows s.AggrCount = 0 s.ResponseAt = time.Now() s.Duration = s.ResponseAt.Sub(s.RequestAt) s.Status = StatementStatusSuccess if err != nil { s.Error = err s.Status = StatementStatusFailed } if !s.reported || s.exported { s.exported = false s.Report(ctx) } } }
var ErrFilteredOut = moerr.NewInternalError(context.Background(), "filtered out")
    var NilSesID [16]byte
    var NilStmtID [16]byte
    var NilTxnID [16]byte
    var ReportStatement = func(ctx context.Context, s *StatementInfo) error { if !GetTracerProvider().IsEnable() { return nil } if s.User == db_holder.MOLoggerUser { return nil } if s.User == "internal" { if s.StatementType == "Commit" || s.StatementType == "Start Transaction" || s.StatementType == "Use" { return nil } } return GetGlobalBatchProcessor().Collect(ctx, s) }
Functions ¶
func ContextWithStatement ¶
func ContextWithStatement(parent context.Context, s *StatementInfo) context.Context
func DefaultContext ¶
func DefaultSpanContext ¶
func DefaultSpanContext() *trace.SpanContext
func DisableLogErrorReport ¶
func DisableLogErrorReport(disable bool)
func EnableTracer ¶
func EnableTracer(enable bool) tracerProviderOption
func GetLongQueryTime ¶ added in v0.8.0
func GetNodeResource ¶
func GetNodeResource() *trace.MONodeResource
func GetSchemaForAccount ¶
GetSchemaForAccount return account's table, and view's schema
func InitSchema ¶
func InitSchema(ctx context.Context, sqlExecutor func() ie.InternalExecutor) error
InitSchema PS: only in standalone or CN node can init schema
func InitSchemaByInnerExecutor ¶
func InitSchemaByInnerExecutor(ctx context.Context, ieFactory func() ie.InternalExecutor) error
InitSchemaByInnerExecutor init schema, which can access db by io.InternalExecutor on any Node.
func InitWithConfig ¶
func InitWithConfig(ctx context.Context, SV *config.ObservabilityParameters, opts ...TracerProviderOption) error
func NewBatchSpanProcessor ¶
func NewBatchSpanProcessor(exporter BatchProcessor) trace.SpanProcessor
func NewBufferPipe2CSVWorker ¶
func NewItemBuffer ¶ added in v0.8.0
func NewItemBuffer(opts ...BufferOption) *itemBuffer
func ReportError ¶
ReportError send to BatchProcessor
func SetDefaultContext ¶
func SetDefaultSpanContext ¶
func SetDefaultSpanContext(sc *trace.SpanContext)
func SetTracerProvider ¶
func SetTracerProvider(p *MOTracerProvider)
func StatementInfoFilter ¶ added in v0.8.0
func StatementInfoUpdate ¶ added in v0.8.0
func StatementInfoUpdate(existing, new Item)
func Time2DatetimeString ¶
func WithAggregatorDisable ¶ added in v0.8.0
func WithAggregatorDisable(disable bool) tracerProviderOption
func WithAggregatorWindow ¶ added in v0.8.0
func WithBatchProcessMode ¶
func WithBatchProcessMode(mode string) tracerProviderOption
func WithBatchProcessor ¶
func WithBatchProcessor(p BatchProcessor) tracerProviderOption
func WithBufferSizeThreshold ¶ added in v0.8.0
func WithBufferSizeThreshold(size int64) tracerProviderOption
func WithExportInterval ¶
func WithExportInterval(secs int) tracerProviderOption
func WithFSWriterFactory ¶
func WithFSWriterFactory(f table.WriterFactory) tracerProviderOption
func WithInitAction ¶
func WithInitAction(init bool) tracerProviderOption
func WithLongQueryTime ¶
func WithLongQueryTime(secs float64) tracerProviderOption
func WithLongSpanTime ¶ added in v0.8.0
func WithSQLExecutor ¶
func WithSQLExecutor(f func() ie.InternalExecutor) tracerProviderOption
func WithSQLWriterDisable ¶ added in v0.8.0
func WithSQLWriterDisable(disable bool) tracerProviderOption
func WithSelectThreshold ¶ added in v0.8.0
func WithSkipRunningStmt ¶ added in v0.8.0
func WithSkipRunningStmt(skip bool) tracerProviderOption
func WithSpanDisable ¶ added in v0.8.0
func WithSpanDisable(disable bool) tracerProviderOption
func WithStmtMergeEnable ¶ added in v0.8.0
func WithStmtMergeEnable(enable bool) tracerProviderOption
Types ¶
type Aggregator ¶ added in v0.8.0
type Aggregator struct {
	Grouped     map[interface{}]Item
	WindowSize  time.Duration
	NewItemFunc func(i Item, ctx context.Context) Item
	UpdateFunc  func(existing, new Item)
	FilterFunc  func(i Item) bool
	// contains filtered or unexported fields
}
    func NewAggregator ¶ added in v0.8.0
func (*Aggregator) Close ¶ added in v0.8.0
func (a *Aggregator) Close()
func (*Aggregator) GetResults ¶ added in v0.8.0
func (a *Aggregator) GetResults() []Item
type BatchProcessor ¶
type BatchProcessor interface {
	Collect(context.Context, batchpipe.HasName) error
	Start() bool
	Stop(graceful bool) error
	Register(name batchpipe.HasName, impl PipeImpl)
}
    func GetGlobalBatchProcessor ¶
func GetGlobalBatchProcessor() BatchProcessor
type BufferOption ¶
type BufferOption interface {
	// contains filtered or unexported methods
}
    func BufferWithFilterItemFunc ¶
func BufferWithFilterItemFunc(f filterItemFunc) BufferOption
func BufferWithGenBatchFunc ¶
func BufferWithGenBatchFunc(f genBatchFunc) BufferOption
func BufferWithReminder ¶
func BufferWithReminder(reminder bp.Reminder) BufferOption
func BufferWithSizeThreshold ¶
func BufferWithSizeThreshold(size int64) BufferOption
func BufferWithType ¶
func BufferWithType(name string) BufferOption
type IBuffer2SqlItem ¶
type ItemSyncer ¶ added in v0.8.0
type ItemSyncer struct {
	// contains filtered or unexported fields
}
    func NewItemSyncer ¶ added in v0.8.0
func NewItemSyncer(item NamedItemRow) *ItemSyncer
func (*ItemSyncer) FillRow ¶ added in v0.8.0
func (s *ItemSyncer) FillRow(ctx context.Context, row *table.Row)
FillRow implements table.RowField
func (*ItemSyncer) Free ¶ added in v0.8.0
func (s *ItemSyncer) Free()
Free implements IBuffer2SqlItem
func (*ItemSyncer) GetCheckWriteHook ¶ added in v0.8.0
func (s *ItemSyncer) GetCheckWriteHook() table.CheckWriteHook
GetCheckWriteHook implements NeedCheckWrite and NeedSyncWrite
func (*ItemSyncer) GetName ¶ added in v0.8.0
func (s *ItemSyncer) GetName() string
GetName implements IBuffer2SqlItem and batchpipe.HasName
func (*ItemSyncer) GetTable ¶ added in v0.8.0
func (s *ItemSyncer) GetTable() *table.Table
GetTable implements table.RowField
func (*ItemSyncer) NeedCheckWrite ¶ added in v0.8.0
func (s *ItemSyncer) NeedCheckWrite() bool
NeedCheckWrite implements NeedCheckWrite
func (*ItemSyncer) NeedSyncWrite ¶ added in v0.8.0
func (s *ItemSyncer) NeedSyncWrite() bool
NeedSyncWrite implements NeedSyncWrite
func (*ItemSyncer) Size ¶ added in v0.8.0
func (s *ItemSyncer) Size() int64
Size implements IBuffer2SqlItem
func (*ItemSyncer) Wait ¶ added in v0.8.0
func (s *ItemSyncer) Wait()
Wait cooperate with NeedSyncWrite and NeedSyncWrite
type Key ¶ added in v0.8.0
type Key struct {
	SessionID     [16]byte
	StatementType string
	Window        time.Time
	Status        StatementInfoStatus
}
    type MOErrorHolder ¶
MOErrorHolder implement export.IBuffer2SqlItem and export.CsvFields
func (*MOErrorHolder) FillRow ¶
func (h *MOErrorHolder) FillRow(ctx context.Context, row *table.Row)
func (*MOErrorHolder) Free ¶
func (h *MOErrorHolder) Free()
func (*MOErrorHolder) GetName ¶
func (h *MOErrorHolder) GetName() string
func (*MOErrorHolder) GetTable ¶
func (h *MOErrorHolder) GetTable() *table.Table
func (*MOErrorHolder) Size ¶
func (h *MOErrorHolder) Size() int64
type MOSpan ¶
type MOSpan struct {
	trace.SpanConfig
	Name      string    `json:"name"`
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `jons:"end_time"`
	// Duration
	Duration time.Duration `json:"duration"`
	// ExtraFields
	ExtraFields []zap.Field `json:"extra"`
	// contains filtered or unexported fields
}
    MOSpan implement export.IBuffer2SqlItem and export.CsvFields
func (*MOSpan) AddExtraFields ¶ added in v0.8.0
func (*MOSpan) End ¶
func (s *MOSpan) End(options ...trace.SpanEndOption)
End record span which meets the following condition If set Deadline in ctx, which specified at the MOTracer.Start, just check if encounters the deadline. If not set, check condition: duration > span.GetLongTimeThreshold()
func (*MOSpan) ParentSpanContext ¶
func (s *MOSpan) ParentSpanContext() trace.SpanContext
func (*MOSpan) SpanContext ¶
func (s *MOSpan) SpanContext() trace.SpanContext
type MOTracer ¶
type MOTracer struct {
	trace.TracerConfig
	// contains filtered or unexported fields
}
    MOTracer is the creator of Spans.
type MOTracerProvider ¶
type MOTracerProvider struct {
	// contains filtered or unexported fields
}
    func GetTracerProvider ¶
func GetTracerProvider() *MOTracerProvider
func (*MOTracerProvider) GetSqlExecutor ¶
func (cfg *MOTracerProvider) GetSqlExecutor() func() ie.InternalExecutor
func (*MOTracerProvider) Tracer ¶
func (p *MOTracerProvider) Tracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer
type MOZapLog ¶
type MOZapLog struct {
	Level       zapcore.Level      `json:"Level"`
	SpanContext *trace.SpanContext `json:"span"`
	Timestamp   time.Time          `json:"timestamp"`
	LoggerName  string
	Caller      string `json:"caller"` // like "util/trace/trace.go:666"
	Message     string `json:"message"`
	Extra       string `json:"extra"` // like json text
	Stack       string `json:"stack"`
}
    MOZapLog implement export.IBuffer2SqlItem and export.CsvFields
type NamedItemRow ¶ added in v0.8.0
type NamedItemRow interface {
	IBuffer2SqlItem
	table.RowField
}
    type NoopBatchProcessor ¶
type NoopBatchProcessor struct {
}
    func (NoopBatchProcessor) Register ¶
func (n NoopBatchProcessor) Register(batchpipe.HasName, PipeImpl)
func (NoopBatchProcessor) Start ¶
func (n NoopBatchProcessor) Start() bool
func (NoopBatchProcessor) Stop ¶
func (n NoopBatchProcessor) Stop(bool) error
type SerializableExecPlan ¶ added in v0.8.0
type SerializeExecPlanFunc ¶
type StatementInfo ¶
type StatementInfo struct {
	StatementID          [16]byte `json:"statement_id"`
	TransactionID        [16]byte `json:"transaction_id"`
	SessionID            [16]byte `jons:"session_id"`
	Account              string   `json:"account"`
	User                 string   `json:"user"`
	Host                 string   `json:"host"`
	RoleId               uint32   `json:"role_id"`
	Database             string   `json:"database"`
	Statement            string   `json:"statement"`
	StmtBuilder          strings.Builder
	StatementFingerprint string    `json:"statement_fingerprint"`
	StatementTag         string    `json:"statement_tag"`
	SqlSourceType        string    `json:"sql_source_type"`
	RequestAt            time.Time `json:"request_at"` // see WithRequestAt
	StatementType string `json:"statement_type"`
	QueryType     string `json:"query_type"`
	// after
	Status     StatementInfoStatus `json:"status"`
	Error      error               `json:"error"`
	ResponseAt time.Time           `json:"response_at"`
	Duration   time.Duration       `json:"duration"` // unit: ns
	// new ExecPlan
	ExecPlan SerializableExecPlan `json:"-"` // set by SetSerializableExecPlan
	// RowsRead, BytesScan generated from ExecPlan
	RowsRead  int64 `json:"rows_read"`  // see ExecPlan2Json
	BytesScan int64 `json:"bytes_scan"` // see ExecPlan2Json
	AggrCount int64 `json:"aggr_count"` // see EndStatement
	ResultCount int64 `json:"result_count"` // see EndStatement
	// contains filtered or unexported fields
}
    func NewStatementInfo ¶ added in v0.8.0
func NewStatementInfo() *StatementInfo
func StatementFromContext ¶
func StatementFromContext(ctx context.Context) *StatementInfo
func (*StatementInfo) ExecPlan2Json ¶
func (s *StatementInfo) ExecPlan2Json(ctx context.Context) []byte
ExecPlan2Json return ExecPlan Serialized json-str // please used in s.mux.Lock()
func (*StatementInfo) ExecPlan2Stats ¶ added in v0.8.0
func (s *StatementInfo) ExecPlan2Stats(ctx context.Context) []byte
ExecPlan2Stats return Stats Serialized int array str and set RowsRead, BytesScan from ExecPlan
func (*StatementInfo) FillRow ¶
func (s *StatementInfo) FillRow(ctx context.Context, row *table.Row)
func (*StatementInfo) Free ¶
func (s *StatementInfo) Free()
func (*StatementInfo) GetName ¶
func (s *StatementInfo) GetName() string
func (*StatementInfo) GetTable ¶
func (s *StatementInfo) GetTable() *table.Table
func (*StatementInfo) IsZeroTxnID ¶
func (s *StatementInfo) IsZeroTxnID() bool
func (*StatementInfo) Key ¶ added in v0.8.0
func (s *StatementInfo) Key(duration time.Duration) interface{}
func (*StatementInfo) Report ¶
func (s *StatementInfo) Report(ctx context.Context)
func (*StatementInfo) SetSerializableExecPlan ¶ added in v0.8.0
func (s *StatementInfo) SetSerializableExecPlan(execPlan SerializableExecPlan)
func (*StatementInfo) SetTxnID ¶
func (s *StatementInfo) SetTxnID(id []byte)
func (*StatementInfo) Size ¶
func (s *StatementInfo) Size() int64
type StatementInfoStatus ¶
type StatementInfoStatus int
const ( StatementStatusRunning StatementInfoStatus = iota StatementStatusSuccess StatementStatusFailed )
func (StatementInfoStatus) String ¶
func (s StatementInfoStatus) String() string
type StatementOption ¶
type StatementOption interface {
	Apply(*StatementInfo)
}
    type StatementOptionFunc ¶
type StatementOptionFunc func(*StatementInfo)
type TracerProviderOption ¶
type TracerProviderOption interface {
	// contains filtered or unexported methods
}
    TracerProviderOption configures a TracerProvider.
type WriteFactoryConfig ¶
type WriteFactoryConfig struct {
	Account     string
	Ts          time.Time
	PathBuilder table.PathBuilder
}