Documentation
¶
Index ¶
- Constants
- Variables
- func DumpBucketsStateAt(deadline time.Time, buckets *Buckets) (string, error)
- func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence
- func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error
- func GetKey(bucketCfg BucketFactory, stackkey string) string
- func LeakRoutine(l *Leaky)
- func LoadBucket(g *BucketFactory, dataFolder string) error
- func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error
- func Pour(l *Leaky, msg types.Event)
- func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error)
- func ShutdownAllBuckets(buckets *Buckets) error
- func TimeMachinePour(l *Leaky, msg types.Event)
- func ValidateFactory(b *BucketFactory) error
- type Blackhole
- type BucketFactory
- type Buckets
- type DumbProcessor
- type HiddenKey
- type Leaky
- type OverflowFilter
- type Processor
- type Queue
- type Trigger
- type Uniq
Constants ¶
const ( LIVE = iota TIMEMACHINE )
Variables ¶
var BucketsCurrentCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cs_buckets", Help: "Number of buckets that currently exist.", }, []string{"name"}, )
var BucketsInstanciation = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_created_total", Help: "Total buckets were instanciated.", }, []string{"name"}, )
var BucketsOverflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_overflowed_total", Help: "Total buckets overflowed.", }, []string{"name"}, )
var BucketsPour = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_poured_total", Help: "Total events were poured in bucket.", }, []string{"source", "name"}, )
var BucketsUnderflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_underflowed_total", Help: "Total buckets underflowed.", }, []string{"name"}, )
var LeakyRoutineCount int64
Functions ¶
func DumpBucketsStateAt ¶
func FormatOverflow ¶
func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence
func GarbageCollectBuckets ¶
The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.
func GetKey ¶
func GetKey(bucketCfg BucketFactory, stackkey string) string
func LeakRoutine ¶
func LeakRoutine(l *Leaky)
for now mimic a leak routine
LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows
func LoadBucket ¶
func LoadBucket(g *BucketFactory, dataFolder string) error
Init recursively process yaml files from a directory and loads them as BucketFactory
func LoadBucketsState ¶
func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error
func PourItemToHolders ¶
func ShutdownAllBuckets ¶ added in v0.2.0
func TimeMachinePour ¶
func ValidateFactory ¶
func ValidateFactory(b *BucketFactory) error
Types ¶
type Blackhole ¶
type Blackhole struct {
DumbProcessor
// contains filtered or unexported fields
}
func NewBlackhole ¶
func NewBlackhole(g *BucketFactory) (*Blackhole, error)
func (*Blackhole) OnBucketOverflow ¶
func (bl *Blackhole) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
type BucketFactory ¶
type BucketFactory struct {
FormatVersion string `yaml:"format"`
Author string `yaml:"author"`
Description string `yaml:"description"`
References []string `yaml:"references"`
Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique
Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity
LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket
Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time
Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result)
Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically
Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow
Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain
CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket
Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc.
OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
BucketName string `yaml:"-"`
Filename string `yaml:"-"`
RunTimeFilter *vm.Program `json:"-"`
ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
RunTimeGroupBy *vm.Program `json:"-"`
Data []*types.DataSource `yaml:"data,omitempty"`
// contains filtered or unexported fields
}
BucketFactory struct holds all fields for any bucket configuration. This is to have a generic struct for buckets. This can be seen as a bucket factory.
func LoadBucketDir ¶
func LoadBuckets ¶
type Buckets ¶
Buckets is the struct used to hold buckets in the context of main.go the idea is to have one struct to rule them all
type DumbProcessor ¶
type DumbProcessor struct {
}
func (*DumbProcessor) OnBucketInit ¶
func (d *DumbProcessor) OnBucketInit(b *BucketFactory) error
func (*DumbProcessor) OnBucketOverflow ¶
func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
func (*DumbProcessor) OnBucketPour ¶
func (d *DumbProcessor) OnBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event
type Leaky ¶
type Leaky struct {
Name string
Mode int //LIVE or TIMEMACHINE
//the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
Limiter rate.RateLimiter `json:"-"`
SerializedState rate.Lstate
//Queue is used to held the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
Queue *Queue
//Leaky buckets are receiving message through a chan
In chan types.Event `json:"-"`
//Leaky buckets are pushing their overflows through a chan
Out chan *Queue `json:"-"`
// shared for all buckets (the idea is to kill this afterwards)
AllOut chan types.Event `json:"-"`
KillSwitch chan bool `json:"-"`
//max capacity (for burst)
Capacity int
//CacheRatio is the number of elements that should be kept in memory (compared to capacity)
CacheSize int
//the unique identifier of the bucket (a hash)
Mapkey string
// chan for signaling
Signal chan bool `json:"-"`
Reprocess bool
Uuid string
First_ts time.Time
Last_ts time.Time
Ovflw_ts time.Time
Total_count int
Leakspeed time.Duration
BucketConfig *BucketFactory
Duration time.Duration
Pour func(*Leaky, types.Event) `json:"-"`
//Profiling when set to true enables profiling of bucket
Profiling bool
// contains filtered or unexported fields
}
Leaky represents one instance of a bucket
func FromFactory ¶
func FromFactory(g BucketFactory) *Leaky
func NewLeaky ¶
func NewLeaky(g BucketFactory) *Leaky
Newleaky creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction
func NewTimeMachine ¶
func NewTimeMachine(g BucketFactory) *Leaky
type OverflowFilter ¶
type OverflowFilter struct {
Filter string
FilterRuntime *vm.Program
DumbProcessor
}
func NewOverflowFilter ¶
func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error)
func (*OverflowFilter) OnBucketOverflow ¶
func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
type Processor ¶
type Processor interface {
OnBucketInit(Bucket *BucketFactory) error
OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
}
type Queue ¶
Queue holds a limited size queue
type Uniq ¶
func (*Uniq) OnBucketInit ¶
func (u *Uniq) OnBucketInit(Bucket *BucketFactory) error
func (*Uniq) OnBucketOverflow ¶
func (u *Uniq) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)