Documentation
¶
Index ¶
- Constants
- type AggregatorBase
- func (p *AggregatorBase) Add(log *protocol.Log, ctx map[string]interface{}) error
- func (*AggregatorBase) Description() string
- func (p *AggregatorBase) Flush() []*protocol.LogGroup
- func (p *AggregatorBase) Init(context ilogtail.Context, que ilogtail.LogGroupQueue) (int, error)
- func (p *AggregatorBase) InitInner(packFlag bool, packString string, lock *sync.Mutex, logstore string, ...)
- func (p *AggregatorBase) Reset()
Constants ¶
const ( MaxLogCount = 1024 MaxLogGroupSize = 3 * 1024 * 1024 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregatorBase ¶
type AggregatorBase struct {
MaxLogGroupCount int // the maximum log group count to trigger flush operation
MaxLogCount int // the maximum log in a log group
PackFlag bool // whether to add config name as a tag
Topic string // the output topic
Lock *sync.Mutex
// contains filtered or unexported fields
}
Other aggregators can use AggregatorBase as base aggregator.
For inner usage, note about following information. There is a quick flush design in AggregatorBase, which is implemented in Add method (search p.queue.Add in current file). Therefore, not all LogGroups are returned through Flush method. If you want to do some operations (such as adding tags) on LogGroups returned by AggregatorBase in your own aggregator, you should do some extra works, just see the sample code in doc.go.
func NewAggregatorBase ¶
func NewAggregatorBase() *AggregatorBase
NewAggregatorBase create a default aggregator with default value.
func (*AggregatorBase) Add ¶
func (p *AggregatorBase) Add(log *protocol.Log, ctx map[string]interface{}) error
Add adds @log to aggregator. It uses defaultLogGroup to store log groups which contain logs as following: defaultLogGroup => [LG1: log1->log2->log3] -> [LG2: log1->log2->log3] -> .. The last log group is set as nowLogGroup, @log will be appended to nowLogGroup if the size and log count of the log group don't exceed limits (MaxLogCount and MAX_LOG_GROUP_SIZE). When nowLogGroup exceeds limits, Add creates a new log group and switch nowLogGroup to it, then append @log to it. When the count of log group reaches MaxLogGroupCount, the first log group will be popped from defaultLogGroup list and add to queue (after adding pack_id tag). Add returns any error encountered, nil means success.
@return error. **For inner usage, must handle this error!!!!**
func (*AggregatorBase) Description ¶
func (*AggregatorBase) Description() string
func (*AggregatorBase) Init ¶
func (p *AggregatorBase) Init(context ilogtail.Context, que ilogtail.LogGroupQueue) (int, error)
Init method would be trigger before working. 1. context store the metadata of this Logstore config 2. que is a transfer channel for flushing LogGroup when reaches the maximum in the cache.