pluginmanager

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BaseVersion = "0.1.0" // will be overwritten through ldflags at compile time

StatisticsConfigJson, AlarmConfigJson

View Source
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
View Source
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
View Source
var CheckPointManager checkPointManager
View Source
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
View Source
var DisabledLogtailConfigLock sync.Mutex

Configs that were disabled because of slow or hang config.

View Source
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
View Source
var FetchAllInterval = time.Second * time.Duration(24*60*60)

24h

View Source
var FirstFetchAllInterval = time.Second * time.Duration(30*60)

30min

View Source
var LastLogtailConfig map[string]*LogstoreConfig
View Source
var LogtailConfig map[string]*LogstoreConfig

Following variables are exported so that tests of main package can reference them.

View Source
var LogtailGlobalConfig = newGlobalConfig()

LogtailGlobalConfig is the singleton instance of GlobalConfig.

View Source
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")
View Source
var UserAgent = fmt.Sprintf("ilogtail/%v (%v)", BaseVersion, runtime.GOOS) // set in global config

Functions

func CollectConfigResult added in v1.4.0

func CollectConfigResult(logGroup *protocol.LogGroup)

func CollectContainers added in v1.4.0

func CollectContainers(logGroup *protocol.LogGroup)

func CollectDeleteContainers added in v1.4.0

func CollectDeleteContainers(logGroup *protocol.LogGroup)

func GetConfigFluhsers added in v1.4.0

func GetConfigFluhsers(runner PluginRunner) []pipeline.Flusher

func GetFlushCancelToken added in v1.4.0

func GetFlushCancelToken(runner PluginRunner) <-chan struct{}

func GetFlushStoreLen added in v1.4.0

func GetFlushStoreLen(runner PluginRunner) int

func GetPluginPriority

func GetPluginPriority(pluginName string) int

func HoldOn

func HoldOn(exitFlag bool) error

HoldOn stops all config instance and checkpoint manager so that it is ready to load new configs or quit. For user-defined config, timeoutStop is used to avoid hanging.

func Init

func Init() (err error)

Init initializes plugin manager.

func LoadGlobalConfig

func LoadGlobalConfig(jsonStr string) int

LoadGlobalConfig updates LogtailGlobalConfig according to jsonStr (only once).

func LoadLogstoreConfig

func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) error

func Resume

func Resume() error

Resume starts all configs.

func TimerFetchFuction added in v1.4.0

func TimerFetchFuction()

Types

type AggregatorWrapper

type AggregatorWrapper struct {
	Aggregator    pipeline.AggregatorV1
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
}

AggregatorWrapper wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.

func (*AggregatorWrapper) Add

func (p *AggregatorWrapper) Add(loggroup *protocol.LogGroup) error

Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.

func (*AggregatorWrapper) AddWithWait

func (p *AggregatorWrapper) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error

AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.

func (*AggregatorWrapper) Run

func (p *AggregatorWrapper) Run(control *pipeline.AsyncControl)

Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.

type AlwaysOnlineManager

type AlwaysOnlineManager struct {
	// contains filtered or unexported fields
}

AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading

func GetAlwaysOnlineManager

func GetAlwaysOnlineManager() *AlwaysOnlineManager

GetAlwaysOnlineManager get a AlwaysOnlineManager instance

func (*AlwaysOnlineManager) AddCachedConfig

func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)

AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout

func (*AlwaysOnlineManager) GetCachedConfig

func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)

GetCachedConfig get cached config from manager and delete this item, so manager will not close this config

func (*AlwaysOnlineManager) GetDeletedConfigs

func (aom *AlwaysOnlineManager) GetDeletedConfigs(
	existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig

GetDeletedConfigs returns cached configs not in @existConfigs.

type ConfigVersion added in v1.4.0

type ConfigVersion string

type ContextImp

type ContextImp struct {
	StringMetrics  map[string]pipeline.StringMetric
	CounterMetrics map[string]pipeline.CounterMetric
	LatencyMetrics map[string]pipeline.LatencyMetric
	// contains filtered or unexported fields
}

func (*ContextImp) AddPlugin

func (p *ContextImp) AddPlugin(name string)

func (*ContextImp) GetCheckPoint

func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)

func (*ContextImp) GetCheckPointObject

func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)

func (*ContextImp) GetConfigName

func (p *ContextImp) GetConfigName() string

func (*ContextImp) GetLogstore

func (p *ContextImp) GetLogstore() string

func (*ContextImp) GetProject

func (p *ContextImp) GetProject() string

func (*ContextImp) GetRuntimeContext

func (p *ContextImp) GetRuntimeContext() context.Context

func (*ContextImp) InitContext

func (p *ContextImp) InitContext(project, logstore, configName string)

func (*ContextImp) MetricSerializeToPB

func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)

func (*ContextImp) RegisterCounterMetric

func (p *ContextImp) RegisterCounterMetric(metric pipeline.CounterMetric)

func (*ContextImp) RegisterLatencyMetric

func (p *ContextImp) RegisterLatencyMetric(metric pipeline.LatencyMetric)

func (*ContextImp) RegisterStringMetric

func (p *ContextImp) RegisterStringMetric(metric pipeline.StringMetric)

func (*ContextImp) SaveCheckPoint

func (p *ContextImp) SaveCheckPoint(key string, value []byte) error

func (*ContextImp) SaveCheckPointObject

func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error

type FlushData added in v1.4.0

type FlushData interface {
	protocol.LogGroup | models.PipelineGroupEvents
}

type FlushOutStore added in v1.4.0

type FlushOutStore[T FlushData] struct {
	// contains filtered or unexported fields
}

func NewFlushOutStore added in v1.4.0

func NewFlushOutStore[T FlushData]() *FlushOutStore[T]

func (*FlushOutStore[T]) Add added in v1.4.0

func (s *FlushOutStore[T]) Add(data ...*T)

func (*FlushOutStore[T]) Get added in v1.4.0

func (s *FlushOutStore[T]) Get() []*T

func (*FlushOutStore[T]) Len added in v1.4.0

func (s *FlushOutStore[T]) Len() int

func (*FlushOutStore[T]) Merge added in v1.4.0

func (s *FlushOutStore[T]) Merge(in *FlushOutStore[T])

func (*FlushOutStore[T]) Reset added in v1.4.0

func (s *FlushOutStore[T]) Reset()

func (*FlushOutStore[T]) Write added in v1.4.0

func (s *FlushOutStore[T]) Write(ch chan *T)

type FlusherWrapper

type FlusherWrapper struct {
	Flusher       pipeline.FlusherV1
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
}

type GlobalConfig

type GlobalConfig struct {
	InputIntervalMs          int
	AggregatIntervalMs       int
	FlushIntervalMs          int
	DefaultLogQueueSize      int
	DefaultLogGroupQueueSize int
	Tags                     map[string]string
	// Directory to store logtail data, such as checkpoint, etc.
	LogtailSysConfDir string
	// Network identification from logtail.
	HostIP       string
	Hostname     string
	AlwaysOnline bool
	DelayStopSec int
}

GlobalConfig represents global configurations of plugin system.

type InputAlarm

type InputAlarm struct {
	// contains filtered or unexported fields
}

func (*InputAlarm) Collect

func (r *InputAlarm) Collect(collector pipeline.Collector) error

func (*InputAlarm) Description

func (r *InputAlarm) Description() string

func (*InputAlarm) Init

func (r *InputAlarm) Init(context pipeline.Context) (int, error)

type InputContainer added in v1.4.0

type InputContainer struct {
	// contains filtered or unexported fields
}

func (*InputContainer) Collect added in v1.4.0

func (r *InputContainer) Collect(collector pipeline.Collector) error

func (*InputContainer) Description added in v1.4.0

func (r *InputContainer) Description() string

func (*InputContainer) Init added in v1.4.0

func (r *InputContainer) Init(context pipeline.Context) (int, error)

type InputStatistics

type InputStatistics struct {
	// contains filtered or unexported fields
}

func (*InputStatistics) Collect

func (r *InputStatistics) Collect(collector pipeline.Collector) error

func (*InputStatistics) Description

func (r *InputStatistics) Description() string

func (*InputStatistics) Init

func (r *InputStatistics) Init(context pipeline.Context) (int, error)

type LogstoreConfig

type LogstoreConfig struct {
	// common fields
	ProjectName  string
	LogstoreName string
	ConfigName   string
	LogstoreKey  int64
	FlushOutFlag bool
	// Each LogstoreConfig can have its independent GlobalConfig if the "global" field
	//   is offered in configuration, see build-in StatisticsConfig and AlarmConfig.
	GlobalConfig *GlobalConfig

	Version      ConfigVersion
	Context      pipeline.Context
	Statistics   LogstoreStatistics
	PluginRunner PluginRunner

	K8sLabelSet           map[string]struct{}
	ContainerLabelSet     map[string]struct{}
	EnvSet                map[string]struct{}
	CollectContainersFlag bool
	// contains filtered or unexported fields
}
var AlarmConfig *LogstoreConfig
var ContainerConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig

Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).

func (*LogstoreConfig) ProcessLog added in v1.1.1

func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string, tags []byte) int

func (*LogstoreConfig) ProcessRawLog

func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int

func (*LogstoreConfig) ProcessRawLogV2

func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int

ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic

func (*LogstoreConfig) Start

func (lc *LogstoreConfig) Start()

Start initializes plugin instances in config and starts them. Procedures:

  1. Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
  2. Start aggregators, allocate new goroutine for each one.
  3. Start processor goroutine to process logs from LogsChan.
  4. Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.

func (*LogstoreConfig) Stop

func (lc *LogstoreConfig) Stop(exitFlag bool) error

Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.

type LogstoreStatistics

type LogstoreStatistics struct {
	CollecLatencytMetric pipeline.LatencyMetric
	RawLogMetric         pipeline.CounterMetric
	SplitLogMetric       pipeline.CounterMetric
	FlushLogMetric       pipeline.CounterMetric
	FlushLogGroupMetric  pipeline.CounterMetric
	FlushReadyMetric     pipeline.CounterMetric
	FlushLatencyMetric   pipeline.LatencyMetric
}

func (*LogstoreStatistics) Init

func (p *LogstoreStatistics) Init(context pipeline.Context)

type MetricWrapper

type MetricWrapper struct {
	Input    pipeline.MetricInputV1
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan      chan *pipeline.LogWithContext
	LatencyMetric pipeline.LatencyMetric
}

func (*MetricWrapper) AddData

func (p *MetricWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*MetricWrapper) AddDataArray

func (p *MetricWrapper) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*MetricWrapper) AddDataArrayWithContext added in v1.1.2

func (p *MetricWrapper) AddDataArrayWithContext(tags map[string]string,
	columns []string,
	values []string,
	ctx map[string]interface{},
	t ...time.Time)

func (*MetricWrapper) AddDataWithContext added in v1.1.2

func (p *MetricWrapper) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time)

func (*MetricWrapper) AddRawLog

func (p *MetricWrapper) AddRawLog(log *protocol.Log)

func (*MetricWrapper) AddRawLogWithContext added in v1.1.2

func (p *MetricWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})

func (*MetricWrapper) Run

func (p *MetricWrapper) Run(control *pipeline.AsyncControl)

type PluginRunner added in v1.4.0

type PluginRunner interface {
	Init(inputQueueSize int, aggrQueueSize int) error

	Initialized() error

	ReceiveRawLog(log *pipeline.LogWithContext)

	AddPlugin(pluginName string, category pluginCategory, plugin interface{}, config map[string]interface{}) error

	Run()

	RunPlugins(category pluginCategory, control *pipeline.AsyncControl)

	Merge(p PluginRunner)

	Stop(exit bool) error
}

type ProcessorWrapper

type ProcessorWrapper struct {
	Processor pipeline.ProcessorV1
	Config    *LogstoreConfig
	LogsChan  chan *pipeline.LogWithContext
	Priority  int
}

type ProcessorWrapperArray

type ProcessorWrapperArray []*ProcessorWrapper

func (ProcessorWrapperArray) Len

func (c ProcessorWrapperArray) Len() int

func (ProcessorWrapperArray) Less

func (c ProcessorWrapperArray) Less(i, j int) bool

func (ProcessorWrapperArray) Swap

func (c ProcessorWrapperArray) Swap(i, j int)

type ServiceWrapper

type ServiceWrapper struct {
	Input    pipeline.ServiceInputV1
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan chan *pipeline.LogWithContext
}

func (*ServiceWrapper) AddData

func (p *ServiceWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*ServiceWrapper) AddDataArray

func (p *ServiceWrapper) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*ServiceWrapper) AddDataArrayWithContext added in v1.1.2

func (p *ServiceWrapper) AddDataArrayWithContext(tags map[string]string,
	columns []string,
	values []string,
	ctx map[string]interface{},
	t ...time.Time)

func (*ServiceWrapper) AddDataWithContext added in v1.1.2

func (p *ServiceWrapper) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time)

func (*ServiceWrapper) AddRawLog

func (p *ServiceWrapper) AddRawLog(log *protocol.Log)

func (*ServiceWrapper) AddRawLogWithContext added in v1.1.2

func (p *ServiceWrapper) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})

func (*ServiceWrapper) Run

func (p *ServiceWrapper) Run(cc *pipeline.AsyncControl)

func (*ServiceWrapper) Stop

func (p *ServiceWrapper) Stop() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL