Documentation
¶
Index ¶
- Variables
- func CollectConfigResult(logGroup *protocol.LogGroup)
- func CollectContainers(logGroup *protocol.LogGroup)
- func CollectDeleteContainers(logGroup *protocol.LogGroup)
- func GetConfigFluhsers(runner PluginRunner) []pipeline.Flusher
- func GetFlushCancelToken(runner PluginRunner) <-chan struct{}
- func GetFlushStoreLen(runner PluginRunner) int
- func GetPluginPriority(pluginName string) int
- func HoldOn(exitFlag bool) error
- func Init() (err error)
- func LoadGlobalConfig(jsonStr string) int
- func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, ...) error
- func Resume() error
- func TimerFetchFuction()
- type AggregatorWrapper
- type AlwaysOnlineManager
- func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
- func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
- func (aom *AlwaysOnlineManager) GetDeletedConfigs(existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
- type ConfigVersion
- type ContextImp
- func (p *ContextImp) AddPlugin(name string)
- func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
- func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
- func (p *ContextImp) GetConfigName() string
- func (p *ContextImp) GetLogstore() string
- func (p *ContextImp) GetProject() string
- func (p *ContextImp) GetRuntimeContext() context.Context
- func (p *ContextImp) InitContext(project, logstore, configName string)
- func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)
- func (p *ContextImp) RegisterCounterMetric(metric pipeline.CounterMetric)
- func (p *ContextImp) RegisterLatencyMetric(metric pipeline.LatencyMetric)
- func (p *ContextImp) RegisterStringMetric(metric pipeline.StringMetric)
- func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
- func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
- type FlushData
- type FlushOutStore
- type FlusherWrapper
- type GlobalConfig
- type InputAlarm
- type InputContainer
- type InputStatistics
- type LogstoreConfig
- func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string, tags []byte) int
- func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
- func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
- func (lc *LogstoreConfig) Start()
- func (lc *LogstoreConfig) Stop(exitFlag bool) error
- type LogstoreStatistics
- type MetricWrapper
- func (p *MetricWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *MetricWrapper) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *MetricWrapper) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ...)
- func (p *MetricWrapper) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, ...)
- func (p *MetricWrapper) AddRawLog(log *protocol.Log)
- func (p *MetricWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
- func (p *MetricWrapper) Run(control *pipeline.AsyncControl)
- type PluginRunner
- type ProcessorWrapper
- type ProcessorWrapperArray
- type ServiceWrapper
- func (p *ServiceWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *ServiceWrapper) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *ServiceWrapper) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ...)
- func (p *ServiceWrapper) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, ...)
- func (p *ServiceWrapper) AddRawLog(log *protocol.Log)
- func (p *ServiceWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
- func (p *ServiceWrapper) Run(cc *pipeline.AsyncControl)
- func (p *ServiceWrapper) Stop() error
Constants ¶
This section is empty.
Variables ¶
var BaseVersion = "0.1.0" // will be overwritten through ldflags at compile time
StatisticsConfigJson, AlarmConfigJson
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
var CheckPointManager checkPointManager
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
var DisabledLogtailConfigLock sync.Mutex
Configs that were disabled because of slow or hang config.
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
var FetchAllInterval = time.Second * time.Duration(24*60*60)
24h
var FirstFetchAllInterval = time.Second * time.Duration(30*60)
30min
var LastLogtailConfig map[string]*LogstoreConfig
var LogtailConfig map[string]*LogstoreConfig
Following variables are exported so that tests of main package can reference them.
var LogtailGlobalConfig = newGlobalConfig()
LogtailGlobalConfig is the singleton instance of GlobalConfig.
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")
var UserAgent = fmt.Sprintf("ilogtail/%v (%v)", BaseVersion, runtime.GOOS) // set in global config
Functions ¶
func CollectConfigResult ¶ added in v1.4.0
func CollectContainers ¶ added in v1.4.0
func CollectDeleteContainers ¶ added in v1.4.0
func GetConfigFluhsers ¶ added in v1.4.0
func GetConfigFluhsers(runner PluginRunner) []pipeline.Flusher
func GetFlushCancelToken ¶ added in v1.4.0
func GetFlushCancelToken(runner PluginRunner) <-chan struct{}
func GetFlushStoreLen ¶ added in v1.4.0
func GetFlushStoreLen(runner PluginRunner) int
func GetPluginPriority ¶
func HoldOn ¶
HoldOn stops all config instance and checkpoint manager so that it is ready to load new configs or quit. For user-defined config, timeoutStop is used to avoid hanging.
func LoadGlobalConfig ¶
LoadGlobalConfig updates LogtailGlobalConfig according to jsonStr (only once).
func LoadLogstoreConfig ¶
func TimerFetchFuction ¶ added in v1.4.0
func TimerFetchFuction()
Types ¶
type AggregatorWrapper ¶
type AggregatorWrapper struct {
Aggregator pipeline.AggregatorV1
Config *LogstoreConfig
LogGroupsChan chan *protocol.LogGroup
Interval time.Duration
}
AggregatorWrapper wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.
func (*AggregatorWrapper) Add ¶
func (p *AggregatorWrapper) Add(loggroup *protocol.LogGroup) error
Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.
func (*AggregatorWrapper) AddWithWait ¶
AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.
func (*AggregatorWrapper) Run ¶
func (p *AggregatorWrapper) Run(control *pipeline.AsyncControl)
Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.
type AlwaysOnlineManager ¶
type AlwaysOnlineManager struct {
// contains filtered or unexported fields
}
AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading
func GetAlwaysOnlineManager ¶
func GetAlwaysOnlineManager() *AlwaysOnlineManager
GetAlwaysOnlineManager get a AlwaysOnlineManager instance
func (*AlwaysOnlineManager) AddCachedConfig ¶
func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout
func (*AlwaysOnlineManager) GetCachedConfig ¶
func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
GetCachedConfig get cached config from manager and delete this item, so manager will not close this config
func (*AlwaysOnlineManager) GetDeletedConfigs ¶
func (aom *AlwaysOnlineManager) GetDeletedConfigs( existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
GetDeletedConfigs returns cached configs not in @existConfigs.
type ConfigVersion ¶ added in v1.4.0
type ConfigVersion string
type ContextImp ¶
type ContextImp struct {
StringMetrics map[string]pipeline.StringMetric
CounterMetrics map[string]pipeline.CounterMetric
LatencyMetrics map[string]pipeline.LatencyMetric
// contains filtered or unexported fields
}
func (*ContextImp) AddPlugin ¶
func (p *ContextImp) AddPlugin(name string)
func (*ContextImp) GetCheckPoint ¶
func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
func (*ContextImp) GetCheckPointObject ¶
func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
func (*ContextImp) GetConfigName ¶
func (p *ContextImp) GetConfigName() string
func (*ContextImp) GetLogstore ¶
func (p *ContextImp) GetLogstore() string
func (*ContextImp) GetProject ¶
func (p *ContextImp) GetProject() string
func (*ContextImp) GetRuntimeContext ¶
func (p *ContextImp) GetRuntimeContext() context.Context
func (*ContextImp) InitContext ¶
func (p *ContextImp) InitContext(project, logstore, configName string)
func (*ContextImp) MetricSerializeToPB ¶
func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)
func (*ContextImp) RegisterCounterMetric ¶
func (p *ContextImp) RegisterCounterMetric(metric pipeline.CounterMetric)
func (*ContextImp) RegisterLatencyMetric ¶
func (p *ContextImp) RegisterLatencyMetric(metric pipeline.LatencyMetric)
func (*ContextImp) RegisterStringMetric ¶
func (p *ContextImp) RegisterStringMetric(metric pipeline.StringMetric)
func (*ContextImp) SaveCheckPoint ¶
func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
func (*ContextImp) SaveCheckPointObject ¶
func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
type FlushData ¶ added in v1.4.0
type FlushData interface {
protocol.LogGroup | models.PipelineGroupEvents
}
type FlushOutStore ¶ added in v1.4.0
type FlushOutStore[T FlushData] struct { // contains filtered or unexported fields }
func NewFlushOutStore ¶ added in v1.4.0
func NewFlushOutStore[T FlushData]() *FlushOutStore[T]
func (*FlushOutStore[T]) Add ¶ added in v1.4.0
func (s *FlushOutStore[T]) Add(data ...*T)
func (*FlushOutStore[T]) Get ¶ added in v1.4.0
func (s *FlushOutStore[T]) Get() []*T
func (*FlushOutStore[T]) Len ¶ added in v1.4.0
func (s *FlushOutStore[T]) Len() int
func (*FlushOutStore[T]) Merge ¶ added in v1.4.0
func (s *FlushOutStore[T]) Merge(in *FlushOutStore[T])
func (*FlushOutStore[T]) Reset ¶ added in v1.4.0
func (s *FlushOutStore[T]) Reset()
func (*FlushOutStore[T]) Write ¶ added in v1.4.0
func (s *FlushOutStore[T]) Write(ch chan *T)
type FlusherWrapper ¶
type GlobalConfig ¶
type GlobalConfig struct {
InputIntervalMs int
AggregatIntervalMs int
FlushIntervalMs int
DefaultLogQueueSize int
DefaultLogGroupQueueSize int
Tags map[string]string
// Directory to store logtail data, such as checkpoint, etc.
LogtailSysConfDir string
// Network identification from logtail.
HostIP string
Hostname string
AlwaysOnline bool
DelayStopSec int
}
GlobalConfig represents global configurations of plugin system.
type InputAlarm ¶
type InputAlarm struct {
// contains filtered or unexported fields
}
func (*InputAlarm) Description ¶
func (r *InputAlarm) Description() string
type InputContainer ¶ added in v1.4.0
type InputContainer struct {
// contains filtered or unexported fields
}
func (*InputContainer) Collect ¶ added in v1.4.0
func (r *InputContainer) Collect(collector pipeline.Collector) error
func (*InputContainer) Description ¶ added in v1.4.0
func (r *InputContainer) Description() string
type InputStatistics ¶
type InputStatistics struct {
// contains filtered or unexported fields
}
func (*InputStatistics) Collect ¶
func (r *InputStatistics) Collect(collector pipeline.Collector) error
func (*InputStatistics) Description ¶
func (r *InputStatistics) Description() string
type LogstoreConfig ¶
type LogstoreConfig struct {
// common fields
ProjectName string
LogstoreName string
ConfigName string
LogstoreKey int64
FlushOutFlag bool
// Each LogstoreConfig can have its independent GlobalConfig if the "global" field
// is offered in configuration, see build-in StatisticsConfig and AlarmConfig.
GlobalConfig *GlobalConfig
Version ConfigVersion
Context pipeline.Context
Statistics LogstoreStatistics
PluginRunner PluginRunner
K8sLabelSet map[string]struct{}
ContainerLabelSet map[string]struct{}
EnvSet map[string]struct{}
CollectContainersFlag bool
// contains filtered or unexported fields
}
var AlarmConfig *LogstoreConfig
var ContainerConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig
Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).
func (*LogstoreConfig) ProcessLog ¶ added in v1.1.1
func (*LogstoreConfig) ProcessRawLog ¶
func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
func (*LogstoreConfig) ProcessRawLogV2 ¶
func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic
func (*LogstoreConfig) Start ¶
func (lc *LogstoreConfig) Start()
Start initializes plugin instances in config and starts them. Procedures:
- Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
- Start aggregators, allocate new goroutine for each one.
- Start processor goroutine to process logs from LogsChan.
- Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.
func (*LogstoreConfig) Stop ¶
func (lc *LogstoreConfig) Stop(exitFlag bool) error
Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.
type LogstoreStatistics ¶
type LogstoreStatistics struct {
CollecLatencytMetric pipeline.LatencyMetric
RawLogMetric pipeline.CounterMetric
SplitLogMetric pipeline.CounterMetric
FlushLogMetric pipeline.CounterMetric
FlushLogGroupMetric pipeline.CounterMetric
FlushReadyMetric pipeline.CounterMetric
FlushLatencyMetric pipeline.LatencyMetric
}
func (*LogstoreStatistics) Init ¶
func (p *LogstoreStatistics) Init(context pipeline.Context)
type MetricWrapper ¶
type MetricWrapper struct {
Input pipeline.MetricInputV1
Config *LogstoreConfig
Tags map[string]string
Interval time.Duration
LogsChan chan *pipeline.LogWithContext
LatencyMetric pipeline.LatencyMetric
}
func (*MetricWrapper) AddDataArray ¶
func (*MetricWrapper) AddDataArrayWithContext ¶ added in v1.1.2
func (*MetricWrapper) AddDataWithContext ¶ added in v1.1.2
func (*MetricWrapper) AddRawLog ¶
func (p *MetricWrapper) AddRawLog(log *protocol.Log)
func (*MetricWrapper) AddRawLogWithContext ¶ added in v1.1.2
func (p *MetricWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
func (*MetricWrapper) Run ¶
func (p *MetricWrapper) Run(control *pipeline.AsyncControl)
type PluginRunner ¶ added in v1.4.0
type PluginRunner interface {
Init(inputQueueSize int, aggrQueueSize int) error
Initialized() error
ReceiveRawLog(log *pipeline.LogWithContext)
AddPlugin(pluginName string, category pluginCategory, plugin interface{}, config map[string]interface{}) error
Run()
RunPlugins(category pluginCategory, control *pipeline.AsyncControl)
Merge(p PluginRunner)
Stop(exit bool) error
}
type ProcessorWrapper ¶
type ProcessorWrapper struct {
Processor pipeline.ProcessorV1
Config *LogstoreConfig
LogsChan chan *pipeline.LogWithContext
Priority int
}
type ProcessorWrapperArray ¶
type ProcessorWrapperArray []*ProcessorWrapper
func (ProcessorWrapperArray) Len ¶
func (c ProcessorWrapperArray) Len() int
func (ProcessorWrapperArray) Less ¶
func (c ProcessorWrapperArray) Less(i, j int) bool
func (ProcessorWrapperArray) Swap ¶
func (c ProcessorWrapperArray) Swap(i, j int)
type ServiceWrapper ¶
type ServiceWrapper struct {
Input pipeline.ServiceInputV1
Config *LogstoreConfig
Tags map[string]string
Interval time.Duration
LogsChan chan *pipeline.LogWithContext
}
func (*ServiceWrapper) AddDataArray ¶
func (*ServiceWrapper) AddDataArrayWithContext ¶ added in v1.1.2
func (*ServiceWrapper) AddDataWithContext ¶ added in v1.1.2
func (*ServiceWrapper) AddRawLog ¶
func (p *ServiceWrapper) AddRawLog(log *protocol.Log)
func (*ServiceWrapper) AddRawLogWithContext ¶ added in v1.1.2
func (p *ServiceWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
func (*ServiceWrapper) Run ¶
func (p *ServiceWrapper) Run(cc *pipeline.AsyncControl)
func (*ServiceWrapper) Stop ¶
func (p *ServiceWrapper) Stop() error
Source Files
¶
- aggregator_wrapper.go
- always_online_manager.go
- checkpoint_manager.go
- container_config_manager.go
- context_imp.go
- flusher_out_store.go
- flusher_wrapper.go
- global_config.go
- logstore_config.go
- metric_wrapper.go
- plugin_manager.go
- plugin_runner.go
- plugin_runner_helper.go
- plugin_runner_v1.go
- plugin_runner_v2.go
- processor_wrapper.go
- self_telemetry_alarm.go
- self_telemetry_container_config.go
- self_telemetry_statistics.go
- service_wrapper.go