Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func NewRunner(ctx context.Context, rt *dbutils.Runtime, catalog *catalog.Catalog, ...) *runner
 - type CheckpointEntry
 - func (e *CheckpointEntry) CheckPrintTime() bool
 - func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error
 - func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error
 - func (e *CheckpointEntry) GetByTableID(ctx context.Context, fs *objectio.ObjectFS, tid uint64) (ins, del, cnIns, segDel *api.Batch, err error)
 - func (e *CheckpointEntry) GetEnd() types.TS
 - func (e *CheckpointEntry) GetLocation() objectio.Location
 - func (e *CheckpointEntry) GetStart() types.TS
 - func (e *CheckpointEntry) GetState() State
 - func (e *CheckpointEntry) GetVersion() uint32
 - func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
 - func (e *CheckpointEntry) IsCommitted() bool
 - func (e *CheckpointEntry) IsFinished() bool
 - func (e *CheckpointEntry) IsIncremental() bool
 - func (e *CheckpointEntry) IsPendding() bool
 - func (e *CheckpointEntry) IsRunning() bool
 - func (e *CheckpointEntry) LessEq(ts types.TS) bool
 - func (e *CheckpointEntry) Prefetch(ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData) (err error)
 - func (e *CheckpointEntry) PrefetchMetaIdx(ctx context.Context, fs *objectio.ObjectFS) (data *logtail.CheckpointData, err error)
 - func (e *CheckpointEntry) Read(ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData) (err error)
 - func (e *CheckpointEntry) ReadMetaIdx(ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData) (err error)
 - func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location)
 - func (e *CheckpointEntry) SetPrintTime()
 - func (e *CheckpointEntry) SetState(state State) (ok bool)
 - func (e *CheckpointEntry) String() string
 
- type DirtyCtx
 - type EntryType
 - type Observer
 - type Option
 - func WithCheckpointBlockRows(rows int) Option
 - func WithCollectInterval(interval time.Duration) Option
 - func WithFlushInterval(interval time.Duration) Option
 - func WithForceFlushCheckInterval(interval time.Duration) Option
 - func WithForceFlushTimeout(to time.Duration) Option
 - func WithGlobalMinCount(count int) Option
 - func WithGlobalVersionInterval(interval time.Duration) Option
 - func WithMinCount(count int) Option
 - func WithMinIncrementalInterval(interval time.Duration) Option
 - func WithObserver(o Observer) Option
 
- type Runner
 - type RunnerReader
 - type State
 - type TestRunner
 
Constants ¶
      View Source
      
  
    const ( PrefetchData uint16 = iota PrefetchMetaIdx ReadMetaIdx ReadData )
      View Source
      
  
    const ( PrefixIncremental = "incremental" PrefixGlobal = "global" PrefixMetadata = "meta" CheckpointDir = "ckp/" )
      View Source
      
  
const ( CheckpointAttr_StartTS = "start_ts" CheckpointAttr_EndTS = "end_ts" CheckpointAttr_MetaLocation = "meta_location" CheckpointAttr_EntryType = "entry_type" CheckpointAttr_Version = "version" CheckpointAttr_AllLocations = "all_locations" CheckpointSchemaColumnCountV1 = 5 // start, end, loc, type, ver )
Variables ¶
      View Source
      
  
    var ( CheckpointSchemaAttr = []string{ CheckpointAttr_StartTS, CheckpointAttr_EndTS, CheckpointAttr_MetaLocation, CheckpointAttr_EntryType, CheckpointAttr_Version, CheckpointAttr_AllLocations, } CheckpointSchemaTypes = []types.Type{ types.New(types.T_TS, 0, 0), types.New(types.T_TS, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_bool, 0, 0), types.New(types.T_uint32, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), } )
      View Source
      
  
var (
	CheckpointSchema *catalog.Schema
)
    Functions ¶
Types ¶
type CheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry ¶ added in v0.6.0
func NewCheckpointEntry(start, end types.TS, typ EntryType) *CheckpointEntry
func (*CheckpointEntry) CheckPrintTime ¶ added in v0.8.0
func (e *CheckpointEntry) CheckPrintTime() bool
func (*CheckpointEntry) GCEntry ¶ added in v0.7.0
func (e *CheckpointEntry) GCEntry(fs *objectio.ObjectFS) error
func (*CheckpointEntry) GCMetadata ¶ added in v0.7.0
func (e *CheckpointEntry) GCMetadata(fs *objectio.ObjectFS) error
func (*CheckpointEntry) GetByTableID ¶ added in v0.6.0
func (*CheckpointEntry) GetEnd ¶ added in v0.6.0
func (e *CheckpointEntry) GetEnd() types.TS
func (*CheckpointEntry) GetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) GetLocation() objectio.Location
func (*CheckpointEntry) GetStart ¶ added in v0.6.0
func (e *CheckpointEntry) GetStart() types.TS
func (*CheckpointEntry) GetState ¶ added in v0.6.0
func (e *CheckpointEntry) GetState() State
func (*CheckpointEntry) GetVersion ¶ added in v1.0.0
func (e *CheckpointEntry) GetVersion() uint32
func (*CheckpointEntry) HasOverlap ¶ added in v0.6.0
func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool
func (*CheckpointEntry) IsCommitted ¶ added in v0.6.0
func (e *CheckpointEntry) IsCommitted() bool
func (*CheckpointEntry) IsFinished ¶ added in v0.6.0
func (e *CheckpointEntry) IsFinished() bool
func (*CheckpointEntry) IsIncremental ¶ added in v0.6.0
func (e *CheckpointEntry) IsIncremental() bool
func (*CheckpointEntry) IsPendding ¶ added in v0.6.0
func (e *CheckpointEntry) IsPendding() bool
func (*CheckpointEntry) IsRunning ¶ added in v0.6.0
func (e *CheckpointEntry) IsRunning() bool
func (*CheckpointEntry) Prefetch ¶ added in v0.8.0
func (e *CheckpointEntry) Prefetch( ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData, ) (err error)
func (*CheckpointEntry) PrefetchMetaIdx ¶ added in v1.0.0
func (e *CheckpointEntry) PrefetchMetaIdx( ctx context.Context, fs *objectio.ObjectFS, ) (data *logtail.CheckpointData, err error)
func (*CheckpointEntry) Read ¶ added in v0.6.0
func (e *CheckpointEntry) Read( ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData, ) (err error)
func (*CheckpointEntry) ReadMetaIdx ¶ added in v1.0.0
func (e *CheckpointEntry) ReadMetaIdx( ctx context.Context, fs *objectio.ObjectFS, data *logtail.CheckpointData, ) (err error)
func (*CheckpointEntry) SetLocation ¶ added in v0.6.0
func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location)
func (*CheckpointEntry) SetPrintTime ¶ added in v0.8.0
func (e *CheckpointEntry) SetPrintTime()
func (*CheckpointEntry) SetState ¶ added in v0.6.0
func (e *CheckpointEntry) SetState(state State) (ok bool)
func (*CheckpointEntry) String ¶ added in v0.6.0
func (e *CheckpointEntry) String() string
type Option ¶ added in v0.6.0
type Option func(*runner)
func WithCheckpointBlockRows ¶ added in v1.0.0
func WithCollectInterval ¶ added in v0.6.0
func WithFlushInterval ¶ added in v0.6.0
func WithForceFlushCheckInterval ¶ added in v0.6.0
func WithForceFlushTimeout ¶ added in v0.6.0
func WithGlobalMinCount ¶ added in v0.7.0
func WithGlobalVersionInterval ¶ added in v0.7.0
func WithMinCount ¶ added in v0.6.0
func WithMinIncrementalInterval ¶ added in v0.6.0
func WithObserver ¶ added in v0.6.0
type Runner ¶ added in v0.6.0
type Runner interface {
	TestRunner
	RunnerReader
	Start()
	Stop()
	String() string
	EnqueueWait(any) error
	Replay(catalog.DataFactory) (types.TS, error)
	FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
	GCByTS(ctx context.Context, ts types.TS) error
	// for test, delete in next phase
	DebugUpdateOptions(opts ...Option)
	GetAllCheckpoints() []*CheckpointEntry
}
    type RunnerReader ¶ added in v0.7.0
type RunnerReader interface {
	GetAllIncrementalCheckpoints() []*CheckpointEntry
	GetAllGlobalCheckpoints() []*CheckpointEntry
	GetPenddingIncrementalCount() int
	GetGlobalCheckpointCount() int
	CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
	ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
	MaxLSN() uint64
}
    type TestRunner ¶ added in v0.7.0
type TestRunner interface {
	EnableCheckpoint()
	DisableCheckpoint()
	CleanPenddingCheckpoint()
	ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error
	ForceIncrementalCheckpoint(end types.TS) error
	IsAllChangesFlushed(start, end types.TS, printTree bool) bool
	MaxLSNInRange(end types.TS) uint64
	ExistPendingEntryToGC() bool
	MaxGlobalCheckpoint() *CheckpointEntry
	MaxCheckpoint() *CheckpointEntry
	ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) (err error)
	GetDirtyCollector() logtail.Collector
}
     Click to show internal directories. 
   Click to hide internal directories.