Documentation
¶
Index ¶
- Constants
- Variables
- type Consumer
- func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData)
- func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error)
- func (c *Consumer) LoadState(state *consumerStateObj) error
- func (c *Consumer) SaveState() (*consumerStateObj, error)
- func (c *Consumer) SetOutput(output output.Output)
- func (c *Consumer) SetStorage(s *storage.Storage)
- func (c *Consumer) Start()
- func (c *Consumer) Stop()
- func (c *Consumer) Update(o *Consumer)
- type ConsumerStat
- type DataAccumulator
- type DataNode
- type DataNodeImpl
- type DelayCalculator
- type DryRunExecutor
- type DryRunRequest
- type DryRunResponse
- type EmitContext
- type GroupResult
- type Input
- type Input_Plain
- type Input_Read
- type LogConsumer
- type LogConsumerManager
- type LogContext
- type LogGroup
- type LogParser
- type LogPathDetector
- type LogPipeline
- func (p *LogPipeline) Key() string
- func (p *LogPipeline) LoadState(store transfer.StateStore) error
- func (p *LogPipeline) SetupConsumer(st *api.SubTask) error
- func (p *LogPipeline) Start() error
- func (p *LogPipeline) Stop()
- func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error
- func (p *LogPipeline) Update(f func(api.Pipeline))
- func (p *LogPipeline) View(f func(api.Pipeline))
- type LogSource
- type LogTaskPipeline
- type ParsedConf
- type ParsedPatternConf
- type PeriodStatus
- type PullLogSource
- type RunInLock
- type SubConsumer
- type TimeParser
- type XElect
- type XGroupBy
- type XSelect
- type XTransformFilter
- type XWhere
- type XWindow
Constants ¶
View Source
const ( TypeAuto = "auto" TypeProcessTime = "processTime" TypeElect = "elect" FormatUnix = "unix" FormatUnixMilli = "unixMilli" FormatGolangLayout = "golangLayout" FormatAuto = "auto" TimeParseError int64 = -2 )
View Source
const (
// ReportTaskInfoInterval indicates the time interval of reporting task info.
ReportTaskInfoInterval = 10
)
Variables ¶
View Source
var ErrIndexOutOfBound = errors.New("index ouf of bound")
View Source
var (
LogParseNotMatched = errors.New("LogParseNotMatched")
)
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
Select XSelect
Where XWhere
GroupBy XGroupBy
Window *XWindow
LogParser LogParser
TimeParser TimeParser
BeforeParseWhere XWhere
// contains filtered or unexported fields
}
Consumer 的设计是不用锁的, 它完全靠上层(Pipeline)来调度, 由 Pipeline 负责保证 Consumer 的调用是安全的
func (*Consumer) AddBatchDetailDatus ¶
func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData)
func (*Consumer) Consume ¶
func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error)
func (*Consumer) SetStorage ¶
type ConsumerStat ¶
type ConsumerStat struct {
IoTotal int32
IoError int32
// IO read empty count
IoEmpty int32
// IO read bytes
Bytes int64
// Read log lines
Lines int32
// Read log groups
Groups int32
Broken bool
// File is missing
Miss bool
Processed int32
FilterBeforeParseWhere int32
FilterLogParseError int32
FilterTimeParseError int32
FilterWhere int32
FilterGroup int32
FilterGroupMaxKeys int32
FilterIgnore int32
FilterMultiline int32
FilterDelay int32
Emit int32
EmitSuccess int32
EmitError int32
// error count when agg
AggWhereError int32
// error count when select
SelectError int32
ZeroBytes int
}
ConsumerStat holds consumer running stats. All fields are public in order to be encoded by gob.
type DataAccumulator ¶
type DataAccumulator interface {
AddBatchDetailDatus([]*model.DetailData)
}
type DataNodeImpl ¶
func (*DataNodeImpl) GetCount ¶
func (d *DataNodeImpl) GetCount() int32
func (*DataNodeImpl) GetNumber ¶
func (d *DataNodeImpl) GetNumber() float64
func (*DataNodeImpl) GetString ¶
func (d *DataNodeImpl) GetString() string
type DelayCalculator ¶
type DelayCalculator struct{}
func NewDelayCalculator ¶
func NewDelayCalculator() *DelayCalculator
type DryRunExecutor ¶
func NewDryRunExecutor ¶
func NewDryRunExecutor(request *DryRunRequest) (*DryRunExecutor, error)
func (*DryRunExecutor) Run ¶
func (e *DryRunExecutor) Run() *DryRunResponse
type DryRunRequest ¶
type DryRunRequest struct {
Task *collecttask.CollectTask
Input *Input `json:"input,omitempty"`
}
TODO 请求体改造, 考虑这个是prod发给reg的
type DryRunResponse ¶
type DryRunResponse struct {
Event *event.Event `json:"event,omitempty"`
GroupResult []*GroupResult `json:"groupResult,omitempty"`
}
type EmitContext ¶
type EmitContext struct{}
type GroupResult ¶
type GroupResult struct {
// 一个 group 表示一组相关的日志, 比如一个错误堆栈是一组日志
// 对于非多行日志场景, 一个group只会包含一行日志
GroupLines []string `json:"groupLines,omitempty"`
// 是否满足(黑白名单)过滤条件, 如果为false, 那么 selectedValues 和 groupBy 都会是空的
Paas bool `json:"paas,omitempty"`
// 数值的切分结果
SelectedValues map[string]interface{} `json:"selectedValues,omitempty"`
// 维度的切分结果
GroupBy map[string]interface{} `json:"groupBy,omitempty"`
}
type Input ¶
type Input struct {
// read: 上层直接提供日志原文
// read: 需要去pod上真实读取日志原文
Type string `json:"type,omitempty"`
Plain *Input_Plain `json:"plain,omitempty"`
Read *Input_Read `json:"read,omitempty"`
}
type Input_Plain ¶
type Input_Read ¶
type Input_Read struct {
// 从末尾的最多读多少行
MaxLines int `json:"maxLines,omitempty"`
}
type LogConsumer ¶
type LogConsumer interface{}
type LogConsumerManager ¶
type LogConsumerManager struct {
// contains filtered or unexported fields
}
type LogContext ¶
type LogContext struct {
// contains filtered or unexported fields
}
日志上下文
func (*LogContext) GetColumnByIndex ¶
func (c *LogContext) GetColumnByIndex(index int) (string, error)
func (*LogContext) GetColumnByName ¶
func (c *LogContext) GetColumnByName(name string) (interface{}, error)
func (*LogContext) GetLine ¶
func (c *LogContext) GetLine() string
type LogParser ¶
type LogParser interface {
Parse(ctx *LogContext) error
}
type LogPathDetector ¶
type LogPathDetector struct {
// contains filtered or unexported fields
}
检测匹配哪些路径 TODO format 要特殊处理下, 它和其他的不太一样 其他的都是lazy感应的, 而format需要实时感应
func NewLogDetector ¶
func NewLogDetector(key string, from *collectconfig.From, target *collecttask.CollectTarget) *LogPathDetector
TODO 这东西会变化... 如果当时pod不可用 那么这个方法会失败从而忽略它的路径 如果后来pod变得可用了, 由于该方法已经执行过, 因此不会再重试...
func (*LogPathDetector) Detect ¶
func (ld *LogPathDetector) Detect() []filematch.FatPath
type LogPipeline ¶
type LogPipeline struct {
// contains filtered or unexported fields
}
LogPipeline is responsible for detecting log inputs(see inputsManager) , scheduling pulling logs task, and put logs to consumer.
func NewPipeline ¶
func (*LogPipeline) Key ¶
func (p *LogPipeline) Key() string
func (*LogPipeline) LoadState ¶
func (p *LogPipeline) LoadState(store transfer.StateStore) error
func (*LogPipeline) SetupConsumer ¶
func (p *LogPipeline) SetupConsumer(st *api.SubTask) error
func (*LogPipeline) Start ¶
func (p *LogPipeline) Start() error
func (*LogPipeline) Stop ¶
func (p *LogPipeline) Stop()
func (*LogPipeline) StopAndSaveState ¶
func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error
func (*LogPipeline) Update ¶
func (p *LogPipeline) Update(f func(api.Pipeline))
func (*LogPipeline) View ¶
func (p *LogPipeline) View(f func(api.Pipeline))
type LogTaskPipeline ¶
type LogTaskPipeline struct {
Source LogSource
ConsumerManager LogConsumerManager
}
type ParsedConf ¶
type ParsedConf struct {
*collectconfig.LogAnalysisConf
ParsedPatterns []*ParsedPatternConf
}
type ParsedPatternConf ¶
type ParsedPatternConf struct {
*collectconfig.LogAnalysisPatternConf
// contains filtered or unexported fields
}
type PeriodStatus ¶
type PeriodStatus struct {
Stat ConsumerStat
EmitSuccess bool
EmitError int
Watermark int64
}
PeriodStatus holds stats for that time period
type RunInLock ¶
type RunInLock func(func())
RunInLock makes a func running in the write lock of the pipeline
type SubConsumer ¶
type SubConsumer interface {
Update(f func())
ProcessGroup(iw *inputWrapper, ctx *LogContext, maxTs *int64)
// Emit emits data with timestamp equals to expectedTs
// Returns true if data is not empty
Emit(expectedTs int64) bool
MaybeFlush()
// contains filtered or unexported methods
}
子消费者, 是一种实际的日志处理
type TimeParser ¶
type TimeParser interface {
Parse(*LogContext) (int64, error)
}
type XElect ¶
type XElect interface {
Init()
Elect(ctx *LogContext) (interface{}, error)
ElectString(ctx *LogContext) (string, error)
ElectNumber(ctx *LogContext) (float64, error)
}
type XGroupBy ¶
type XGroupBy interface {
// TODO groupBy的结果存在哪里
GroupNames() []string
Execute(ctx *LogContext) ([]string, error)
MaxKeySize() int
}
type XSelect ¶
type XSelect interface {
Select(ctx *LogContext) ([]DataNode, error)
}
Execute select
type XTransformFilter ¶
type XTransformFilter interface {
// Init initializes this filter instance, and returns error during initialization.
Init() error
// Filter transforms value
Filter(ctx *LogContext) (interface{}, error)
}
A XTransformFilter is used to transform value.
type XWhere ¶
type XWhere interface {
Test(ctx *LogContext) (bool, error)
}
Source Files
¶
- consumer.go
- consumer_log_multiline.go
- consumer_log_sub.go
- consumer_log_sub_analysis.go
- consumer_log_sub_detail.go
- consumer_log_sub_stat.go
- consumer_parser.go
- delay.go
- elect.go
- elect_context.go
- elect_leftright.go
- elect_line.go
- elect_pathvar.go
- elect_refindex.go
- elect_refmeta.go
- elect_refname.go
- elect_refvar.go
- elect_regexp.go
- elect_transform.go
- executor_dryrun.go
- executor_log.go
- from.go
- from_log.go
- group.go
- inputsmanager.go
- log_detector.go
- logparser.go
- logparser_grok.go
- logparser_json.go
- logparser_regexp.go
- multiline.go
- pipeline.go
- pool.go
- select.go
- time.go
- time_elect_auto.go
- time_elect_golanglayout.go
- time_elect_unix.go
- time_elect_unix_milli.go
- time_processtime.go
- transform.go
- transform_append_v1.go
- transform_cleanurl_v1.go
- transform_const.go
- transform_discard.go
- transform_mapping_v1.go
- transform_regexp_v1.go
- transform_substring_v1.go
- transform_switchcase_v1.go
- v1.go
- v2.go
- vars.go
- where.go
- where_alwaystrue.go
- where_boolean.go
- where_contains.go
- where_in.go
- where_numberop.go
- where_regexp.go
- window.go
Click to show internal directories.
Click to hide internal directories.