Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func CreateCronTask(ctx context.Context, executorID task.TaskCode, ...) error
 - func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, enableSqlWriter bool) table.WriterFactory
 - func InitCronExpr(ctx context.Context, duration time.Duration) error
 - func InitMerge(ctx context.Context, SV *config.ObservabilityParameters) error
 - func LongRunETLMerge(ctx context.Context, task task.AsyncTask, logger *log.MOLogger, ...) error
 - func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error
 - func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
 - func SubStringPrefixLimit(str string, length int) string
 - type Cache
 - type ContentReader
 - type ETLReader
 - type ETLWriter
 - type FileMeta
 - type MOCollector
 - func (c *MOCollector) Collect(ctx context.Context, item batchpipe.HasName) error
 - func (c *MOCollector) DiscardableCollect(ctx context.Context, item batchpipe.HasName) error
 - func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
 - func (c *MOCollector) Start() bool
 - func (c *MOCollector) Stop(graceful bool) error
 
- type MOCollectorOption
 - type Merge
 - type MergeOption
 - type PipeImplHolder
 - type SliceCache
 
Constants ¶
const BatchReadRows = 4000
    BatchReadRows ~= 20MB rawlog file has about 3700+ rows
const LoggerNameContentReader = "ETLContentReader"
    const LoggerNameETLMerge = "ETLMerge"
    const LoggerNameMOCollector = "MOCollector"
    const MAX_MERGE_INSERT_TIME = 10 * time.Second
    const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
    const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
    const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
    const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
    const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
    const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
    const MergeTaskCronExprEveryMin = "0 * * * * *"
    const MergeTaskCronExprYesterday = "0 5 0 * * *"
    const MergeTaskToday = "today"
    const MergeTaskYesterday = "yesterday"
    const ParamSeparator = " "
    Variables ¶
var ETLMergeTaskPool *mpool.MPool
    var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour
    MergeTaskCronExpr support sec level Deprecated
Functions ¶
func CreateCronTask ¶
func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error
func GetWriterFactory ¶ added in v0.7.0
func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, enableSqlWriter bool) table.WriterFactory
func InitCronExpr ¶
InitCronExpr support min interval 5 min, max 12 hour
func LongRunETLMerge ¶ added in v0.8.0
func MergeTaskMetadata ¶
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func SubStringPrefixLimit ¶ added in v0.7.0
Types ¶
type ContentReader ¶
type ContentReader struct {
	// contains filtered or unexported fields
}
    func NewContentReader ¶
func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader
func (*ContentReader) Close ¶
func (s *ContentReader) Close()
func (*ContentReader) ReadLine ¶
func (s *ContentReader) ReadLine() ([]string, error)
type ETLReader ¶ added in v0.7.0
type ETLReader interface {
	// ReadRow read one line as table.Row
	ReadRow(row *table.Row) error
	// ReadLine read raw data from file.
	ReadLine() ([]string, error)
	// Close files and release all content.
	Close()
}
    func NewCSVReader ¶
func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)
NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error
type ETLWriter ¶ added in v0.7.0
type ETLWriter interface {
	// WriteRow write table.Row as one line info file.
	WriteRow(row *table.Row) error
	// WriteStrings write record as one line into file.
	WriteStrings(record []string) error
	// FlushAndClose flush its buffer and close the writer.
	FlushAndClose() (int, error)
}
    ETLWriter handle serialize logic, like csv file and tae file.
type MOCollector ¶
type MOCollector struct {
	motrace.BatchProcessor
	// contains filtered or unexported fields
}
    MOCollector handle all bufferPipe
func NewMOCollector ¶
func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector
func (*MOCollector) DiscardableCollect ¶ added in v1.0.0
DiscardableCollect implements motrace.DiscardableCollector cooperate with logutil.Discardable() field
func (*MOCollector) Register ¶ added in v0.7.0
func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
func (*MOCollector) Start ¶
func (c *MOCollector) Start() bool
Start all goroutine worker, including collector, generator, and exporter
func (*MOCollector) Stop ¶
func (c *MOCollector) Stop(graceful bool) error
type MOCollectorOption ¶ added in v0.7.0
type MOCollectorOption func(*MOCollector)
func WithCollectorCntP ¶ added in v1.1.0
func WithCollectorCntP(p int) MOCollectorOption
func WithExporterCntP ¶ added in v1.1.0
func WithExporterCntP(p int) MOCollectorOption
func WithGeneratorCntP ¶ added in v1.1.0
func WithGeneratorCntP(p int) MOCollectorOption
func WithOBCollectorConfig ¶ added in v0.8.0
func WithOBCollectorConfig(cfg *config.OBCollectorConfig) MOCollectorOption
type Merge ¶
type Merge struct {
	// MaxFileSize the total filesize to trigger doMergeFiles(),default: 32 MB
	// Deprecated
	MaxFileSize int64 // set by WithMaxFileSize
	// MaxMergeJobs 允许进行的 Merge 的任务个数,default: 1
	MaxMergeJobs int64 // set by WithMaxMergeJobs
	// contains filtered or unexported fields
}
    Merge like a compaction, merge input files into one/two/... files. - NewMergeService init merge as service, with serviceInited to avoid multi init. - MergeTaskExecutorFactory drive by Cron TaskService. - NewMerge handle merge obj init. - Merge.Start() as service loop, trigger Merge.Main() - Merge.Main() handle main job.
- foreach account, build `rootPath` with tuple {account, date, Table }
 - call Merge.doMergeFiles() with all files in `rootPath`, do merge job
 
- Merge.doMergeFiles handle one job flow: read each file, merge in cache, write into file.
func NewMergeService ¶
type MergeOption ¶
type MergeOption func(*Merge)
func WithFileService ¶
func WithFileService(fs fileservice.FileService) MergeOption
func WithMaxFileSize ¶
func WithMaxFileSize(filesize int64) MergeOption
func WithMaxMergeJobs ¶
func WithMaxMergeJobs(jobs int64) MergeOption
func WithTable ¶
func WithTable(tbl *table.Table) MergeOption
func WithTask ¶ added in v1.0.0
func WithTask(task task.AsyncTask) MergeOption
func (MergeOption) Apply ¶
func (opt MergeOption) Apply(m *Merge)
type PipeImplHolder ¶ added in v0.7.0
type PipeImplHolder struct {
	// contains filtered or unexported fields
}
    func (*PipeImplHolder) Get ¶ added in v0.7.0
func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)
func (*PipeImplHolder) Put ¶ added in v0.7.0
func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool
func (*PipeImplHolder) Size ¶ added in v0.7.0
func (h *PipeImplHolder) Size() int
type SliceCache ¶
type SliceCache struct {
	// contains filtered or unexported fields
}
    func (*SliceCache) IsEmpty ¶
func (c *SliceCache) IsEmpty() bool
func (*SliceCache) Put ¶
func (c *SliceCache) Put(r *table.Row)
func (*SliceCache) Reset ¶
func (c *SliceCache) Reset()
func (*SliceCache) Size ¶
func (c *SliceCache) Size() int64