Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func CreateCronTask(ctx context.Context, executorID task.TaskCode, ...) error
 - func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, ext string, ...) table.WriterFactory
 - func InitCronExpr(ctx context.Context, duration time.Duration) error
 - func InitMerge(ctx context.Context, SV *config.ObservabilityParameters) error
 - func LongRunETLMerge(ctx context.Context, task task.Task, logger *log.MOLogger, opts ...MergeOption) error
 - func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error
 - func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
 - func SubStringPrefixLimit(str string, length int) string
 - type Cache
 - type ContentReader
 - type ContentWriter
 - type ETLReader
 - type ETLWriter
 - type FileMeta
 - type MOCollector
 - type MOCollectorOption
 - type Merge
 - type MergeOption
 - type PipeImplHolder
 - type SliceCache
 
Constants ¶
      View Source
      
  const BatchReadRows = 4000
    BatchReadRows ~= 20MB rawlog file has about 3700+ rows
      View Source
      
  
    const LoggerNameContentReader = "ETLContentReader"
    
      View Source
      
  
    const LoggerNameETLMerge = "ETLMerge"
    
      View Source
      
  
    const LoggerNameMOCollector = "MOCollector"
    
      View Source
      
  
    const MAX_MERGE_INSERT_TIME = 10 * time.Second
    
      View Source
      
  
    const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
    
      View Source
      
  
    const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
    
      View Source
      
  
    const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
    
      View Source
      
  
    const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
    
      View Source
      
  
    const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
    
      View Source
      
  
    const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
    
      View Source
      
  
    const MergeTaskCronExprEveryMin = "0 * * * * *"
    
      View Source
      
  
    const MergeTaskCronExprYesterday = "0 5 0 * * *"
    
      View Source
      
  
    const MergeTaskToday = "today"
    
      View Source
      
  
    const MergeTaskYesterday = "yesterday"
    
      View Source
      
  
const ParamSeparator = " "
    Variables ¶
      View Source
      
  
    var ETLMergeTaskPool *mpool.MPool
    
      View Source
      
  var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour
    MergeTaskCronExpr support sec level
Functions ¶
func CreateCronTask ¶
func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error
func GetWriterFactory ¶ added in v0.7.0
func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, ext string, enableSqlWriter bool) table.WriterFactory
func InitCronExpr ¶
InitCronExpr support min interval 5 min, max 12 hour
func LongRunETLMerge ¶ added in v0.8.0
func MergeTaskMetadata ¶
func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata
MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"
func SubStringPrefixLimit ¶ added in v0.7.0
Types ¶
type ContentReader ¶
type ContentReader struct {
	// contains filtered or unexported fields
}
    func NewContentReader ¶
func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader
func (*ContentReader) Close ¶
func (s *ContentReader) Close()
func (*ContentReader) ReadLine ¶
func (s *ContentReader) ReadLine() ([]string, error)
type ContentWriter ¶
type ContentWriter struct {
	// contains filtered or unexported fields
}
    func NewContentWriter ¶
func NewContentWriter(writer io.StringWriter, buffer []byte) *ContentWriter
func (*ContentWriter) FlushAndClose ¶
func (w *ContentWriter) FlushAndClose() (int, error)
func (*ContentWriter) WriteRow ¶ added in v0.7.0
func (w *ContentWriter) WriteRow(row *table.Row) error
func (*ContentWriter) WriteStrings ¶
func (w *ContentWriter) WriteStrings(record []string) error
type ETLReader ¶ added in v0.7.0
type ETLReader interface {
	// ReadRow read one line as table.Row
	ReadRow(row *table.Row) error
	// ReadLine read raw data from file.
	ReadLine() ([]string, error)
	// Close files and release all content.
	Close()
}
    func NewCSVReader ¶
func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)
NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error
type ETLWriter ¶ added in v0.7.0
type ETLWriter interface {
	// WriteRow write table.Row as one line info file.
	WriteRow(row *table.Row) error
	// WriteStrings write record as one line into file.
	WriteStrings(record []string) error
	// FlushAndClose flush its buffer and close the writer.
	FlushAndClose() (int, error)
}
    ETLWriter handle serialize logic, like csv file and tae file.
type MOCollector ¶
type MOCollector struct {
	motrace.BatchProcessor
	// contains filtered or unexported fields
}
    MOCollector handle all bufferPipe
func NewMOCollector ¶
func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector
func (*MOCollector) Register ¶ added in v0.7.0
func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)
func (*MOCollector) Start ¶
func (c *MOCollector) Start() bool
Start all goroutine worker, including collector, generator, and exporter
func (*MOCollector) Stop ¶
func (c *MOCollector) Stop(graceful bool) error
type MOCollectorOption ¶ added in v0.7.0
type MOCollectorOption func(*MOCollector)
func WithCollectorCnt ¶ added in v0.7.0
func WithCollectorCnt(cnt int) MOCollectorOption
func WithExporterCnt ¶ added in v0.7.0
func WithExporterCnt(cnt int) MOCollectorOption
func WithGeneratorCnt ¶ added in v0.7.0
func WithGeneratorCnt(cnt int) MOCollectorOption
func WithOBCollectorConfig ¶ added in v0.8.0
func WithOBCollectorConfig(cfg *config.OBCollectorConfig) MOCollectorOption
type Merge ¶
type Merge struct {
	Task task.Task
	Table *table.Table            // WithTable
	FS    fileservice.FileService // WithFileService
	// MaxFileSize 控制合并后最大文件大小,default: 128 MB
	MaxFileSize int64 // WithMaxFileSize
	// MaxMergeJobs 允许进行的 Merge 的任务个数,default: 16
	MaxMergeJobs int64 // WithMaxMergeJobs
	// MinFilesMerge 控制 Merge 最少合并文件个数,default:2
	//
	// Deprecated: useless in Merge all in one file
	MinFilesMerge int // WithMinFilesMerge
	// FileCacheSize 控制 Merge 过程中,允许缓存的文件大小,default: 32 MB
	FileCacheSize int64
	// contains filtered or unexported fields
}
    Merge like a compaction, merge input files into one/two/... files.
- `NewMergeService` init merge as service, with param `serviceInited` to avoid multi init.
 - `MergeTaskExecutorFactory` drive by Cron TaskService.
 - `NewMerge` handle merge obj init.
 - `Merge::Start` as service loop, trigger `Merge::Main` each cycle
 - `Merge::Main` handle handle job, 1. foreach account, build `rootPath` with tuple {account, date, Table } 2. call `Merge::doMergeFiles` with all files in `rootPath`, do merge job
 - `Merge::doMergeFiles` handle one job flow: read each file, merge in cache, write into file.
 
func NewMergeService ¶
func (*Merge) ListRange ¶ added in v0.8.0
ListRange do list all accounts, all dates which belong to m.Table.GetName()
type MergeOption ¶
type MergeOption func(*Merge)
func WithFileService ¶
func WithFileService(fs fileservice.FileService) MergeOption
func WithMaxFileSize ¶
func WithMaxFileSize(filesize int64) MergeOption
func WithMaxMergeJobs ¶
func WithMaxMergeJobs(jobs int64) MergeOption
func WithMinFilesMerge ¶
func WithMinFilesMerge(files int) MergeOption
func WithTable ¶
func WithTable(tbl *table.Table) MergeOption
func (MergeOption) Apply ¶
func (opt MergeOption) Apply(m *Merge)
type PipeImplHolder ¶ added in v0.7.0
type PipeImplHolder struct {
	// contains filtered or unexported fields
}
    func (*PipeImplHolder) Get ¶ added in v0.7.0
func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)
func (*PipeImplHolder) Put ¶ added in v0.7.0
func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool
func (*PipeImplHolder) Size ¶ added in v0.7.0
func (h *PipeImplHolder) Size() int
type SliceCache ¶
type SliceCache struct {
	// contains filtered or unexported fields
}
    func (*SliceCache) IsEmpty ¶
func (c *SliceCache) IsEmpty() bool
func (*SliceCache) Put ¶
func (c *SliceCache) Put(r *table.Row)
func (*SliceCache) Reset ¶
func (c *SliceCache) Reset()
func (*SliceCache) Size ¶
func (c *SliceCache) Size() int64
 Click to show internal directories. 
   Click to hide internal directories.