Documentation
¶
Index ¶
- func EventsFromQueue(queue *pipeline.Queue) []*models.Event
- func GarbageCollectBuckets(deadline time.Time, bucketStore *BucketStore)
- func NewAlert(leaky *Leaky, queue *pipeline.Queue) (pipeline.RuntimeAlert, error)
- func Pour(l *Leaky, gate pourGate, msg pipeline.Event)
- func PourItemToBucket(ctx context.Context, bucket *Leaky, holder *BucketFactory, ...) error
- func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, ...) (bool, error)
- func SourceFromEvent(evt pipeline.Event, leaky *Leaky) (map[string]models.Source, error)
- func TimeMachinePour(l *Leaky, _ pourGate, msg pipeline.Event)
- type BayesianEvent
- type BayesianProcessor
- type BayesianType
- type BlackholeProcessor
- type BucketFactory
- type BucketSpec
- type BucketStore
- func (b *BucketStore) BeginPour() (end func())
- func (b *BucketStore) Delete(key string)
- func (b *BucketStore) FreezePours() (resume func())
- func (b *BucketStore) Len() int
- func (b *BucketStore) Load(key string) (*Leaky, bool)
- func (b *BucketStore) LoadOrStore(key string, val *Leaky) (*Leaky, bool)
- func (b *BucketStore) Snapshot() map[string]*Leaky
- type BucketType
- type CancelProcessor
- func (*CancelProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
- func (p *CancelProcessor) OnBucketInit(f *BucketFactory) error
- func (*CancelProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
- func (p *CancelProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
- type ConditionalProcessor
- type ConditionalType
- type CounterType
- type DumbProcessor
- func (*DumbProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
- func (*DumbProcessor) OnBucketInit(_ *BucketFactory) error
- func (*DumbProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
- func (*DumbProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
- type Leaky
- type LeakyType
- type OverflowProcessor
- type PourCollector
- type Processor
- type RawBayesianCondition
- type ScopeType
- type SimulationChecker
- type TriggerProcessor
- type TriggerType
- type UniqProcessor
- func (*UniqProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
- func (p *UniqProcessor) OnBucketInit(f *BucketFactory) error
- func (*UniqProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
- func (p *UniqProcessor) OnBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EventsFromQueue ¶ added in v1.0.0
EventsFromQueue iterates the queue to collect & prepare meta-datas from alert
func GarbageCollectBuckets ¶
func GarbageCollectBuckets(deadline time.Time, bucketStore *BucketStore)
The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.
func NewAlert ¶ added in v1.0.0
NewAlert will generate a RuntimeAlert and its APIAlert(s) from a bucket that overflowed
func PourItemToBucket ¶ added in v1.2.3
func PourItemToBucket( ctx context.Context, bucket *Leaky, holder *BucketFactory, bucketStore *BucketStore, parsed *pipeline.Event, collector *PourCollector, ) error
func PourItemToHolders ¶
func PourItemToHolders( ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *BucketStore, collector *PourCollector, ) (bool, error)
func SourceFromEvent ¶ added in v1.0.0
SourceFromEvent extracts and formats a valid models.Source object from an Event
func TimeMachinePour ¶
Types ¶
type BayesianEvent ¶ added in v1.5.3
type BayesianEvent struct {
// contains filtered or unexported fields
}
type BayesianProcessor ¶
type BayesianProcessor struct {
DumbProcessor
// contains filtered or unexported fields
}
func (*BayesianProcessor) AfterBucketPour ¶
func (p *BayesianProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event
func (*BayesianProcessor) OnBucketInit ¶
func (p *BayesianProcessor) OnBucketInit(f *BucketFactory) error
type BayesianType ¶
type BayesianType struct{}
func (BayesianType) BuildProcessors ¶
func (BayesianType) BuildProcessors(_ *BucketFactory) []Processor
func (BayesianType) Validate ¶
func (BayesianType) Validate(f *BucketFactory) error
type BlackholeProcessor ¶
type BlackholeProcessor struct {
DumbProcessor
// contains filtered or unexported fields
}
func NewBlackholeProcessor ¶
func NewBlackholeProcessor(f *BucketFactory) (*BlackholeProcessor, error)
func (*BlackholeProcessor) OnBucketOverflow ¶
func (p *BlackholeProcessor) OnBucketOverflow( _ *BucketFactory, leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue, ) (pipeline.RuntimeAlert, *pipeline.Queue)
type BucketFactory ¶
type BucketFactory struct {
Spec BucketSpec
BucketName string
Filename string
RunTimeFilter *vm.Program `json:"-"`
RunTimeGroupBy *vm.Program `json:"-"`
DataDir string
Simulated bool // Set to true if the scenario instantiating the bucket was in the exclusion list
// contains filtered or unexported fields
}
BucketFactory is the compiled/validated, reusable template produced from a BucketSpec.
func LoadBuckets ¶
func (*BucketFactory) BucketKey ¶
func (f *BucketFactory) BucketKey(stackkey string) string
func (*BucketFactory) LoadBucket ¶
func (f *BucketFactory) LoadBucket() error
LoadBucket validates and prepares a BucketFactory for runtime use (compile expressions, init processors, init data).
func (*BucketFactory) Validate ¶
func (f *BucketFactory) Validate() error
type BucketSpec ¶
type BucketSpec struct {
FormatVersion string `yaml:"format"`
Description string `yaml:"description"`
References []string `yaml:"references"`
Type string `yaml:"type"` // Type can be : leaky, counter, trigger. It determines the main bucket characteristics
Name string `yaml:"name"` // Name of the bucket, used later in log and user-messages. Should be unique
Capacity int `yaml:"capacity"` // Capacity is applicable to leaky buckets and determines the "burst" capacity
LeakSpeed string `yaml:"leakspeed"` // Leakspeed is a float representing how many events per second leak out of the bucket
Filter string `yaml:"filter"` // Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
GroupBy string `yaml:"groupby,omitempty"` // groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
Distinct string `yaml:"distinct"` // Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
Debug bool `yaml:"debug"` // Debug, when set to true, will enable debugging for _this_ scenario specifically
Labels map[string]any `yaml:"labels"` // Labels is K:V list aiming at providing context the overflow
Blackhole string `yaml:"blackhole,omitempty"` // Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
ScopeType ScopeType `yaml:"scope,omitempty"` // to enforce a different remediation than blocking an IP. Will default this to IP
Reprocess bool `yaml:"reprocess"` // Reprocess, if true, will for the bucket to be re-injected into processing chain
Data []*enrichment.DataProvider `yaml:"data,omitempty"`
ConditionalOverflow string `yaml:"condition"` // condition if present, is an expression that must return true for the bucket to overflow
CacheSize int `yaml:"cache_size"` // CacheSize, if > 0, limits the size of in-memory cache of the bucket
CancelOnFilter string `yaml:"cancel_on,omitempty"` // a filter that, if matched, kills the bucket
BayesianPrior float32 `yaml:"bayesian_prior"`
BayesianThreshold float32 `yaml:"bayesian_threshold"`
BayesianConditions []RawBayesianCondition `yaml:"bayesian_conditions"` // conditions for the bayesian bucket
OverflowFilter string `yaml:"overflow_filter"` // OverflowFilter if present, is a filter that must return true for the overflow to go through
Duration string `yaml:"duration"` // Duration allows 'counter' buckets to have a fixed life-time
ScenarioVersion string `yaml:"version,omitempty"`
}
BucketSpec is the declarative YAML config for a scenario bucket. It holds all the possible user-provided fields.
type BucketStore ¶
type BucketStore struct {
// contains filtered or unexported fields
}
BucketStore is the struct used to hold buckets during the lifecycle of the app (i.e. between reloads).
func NewBucketStore ¶
func NewBucketStore() *BucketStore
func (*BucketStore) BeginPour ¶
func (b *BucketStore) BeginPour() (end func())
BeginPour blocks while a dump/snapshot is in progress.
The returned function *must* be called exactly once, usually deferred, after the event has been poured.
func (*BucketStore) Delete ¶
func (b *BucketStore) Delete(key string)
func (*BucketStore) FreezePours ¶
func (b *BucketStore) FreezePours() (resume func())
FreezePours prevents new pours to start and waits for in-flight pours to finish.
The returned function *must* be called exactly once, usually deferred, to allow pouring again.
func (*BucketStore) Len ¶
func (b *BucketStore) Len() int
func (*BucketStore) LoadOrStore ¶
func (b *BucketStore) LoadOrStore(key string, val *Leaky) (*Leaky, bool)
func (*BucketStore) Snapshot ¶
func (b *BucketStore) Snapshot() map[string]*Leaky
type BucketType ¶
type BucketType interface {
Validate(f *BucketFactory) error
BuildProcessors(f *BucketFactory) []Processor
}
type CancelProcessor ¶
func (*CancelProcessor) AfterBucketPour ¶
func (*CancelProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
func (*CancelProcessor) OnBucketInit ¶
func (p *CancelProcessor) OnBucketInit(f *BucketFactory) error
func (*CancelProcessor) OnBucketOverflow ¶
func (*CancelProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
func (*CancelProcessor) OnBucketPour ¶
func (p *CancelProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
type ConditionalProcessor ¶
type ConditionalProcessor struct {
ConditionalFilter string
ConditionalFilterRuntime *vm.Program
DumbProcessor
}
func (*ConditionalProcessor) AfterBucketPour ¶
func (p *ConditionalProcessor) AfterBucketPour(f *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event
func (*ConditionalProcessor) OnBucketInit ¶
func (p *ConditionalProcessor) OnBucketInit(f *BucketFactory) error
type ConditionalType ¶
type ConditionalType struct{}
func (ConditionalType) BuildProcessors ¶
func (ConditionalType) BuildProcessors(_ *BucketFactory) []Processor
func (ConditionalType) Validate ¶
func (ConditionalType) Validate(f *BucketFactory) error
type CounterType ¶
type CounterType struct{}
func (CounterType) BuildProcessors ¶
func (CounterType) BuildProcessors(_ *BucketFactory) []Processor
func (CounterType) Validate ¶
func (CounterType) Validate(f *BucketFactory) error
type DumbProcessor ¶
type DumbProcessor struct{}
func (*DumbProcessor) AfterBucketPour ¶ added in v1.5.0
func (*DumbProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
func (*DumbProcessor) OnBucketInit ¶
func (*DumbProcessor) OnBucketInit(_ *BucketFactory) error
func (*DumbProcessor) OnBucketOverflow ¶
func (*DumbProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
func (*DumbProcessor) OnBucketPour ¶
func (*DumbProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
type Leaky ¶
type Leaky struct {
Mode int // LIVE or TIMEMACHINE
// the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
Limiter rate.RateLimiter `json:"-"`
SerializedState rate.Lstate
// Queue is used to hold the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
Queue *pipeline.Queue
// Leaky buckets are receiving message through a chan
In chan *pipeline.Event `json:"-"`
// Leaky buckets are pushing their overflows through a chan
Out chan *pipeline.Queue `json:"-"`
// shared for all buckets (the idea is to kill this afterward)
AllOut chan pipeline.Event `json:"-"`
// the unique identifier of the bucket (a hash)
Mapkey string
Suicide chan bool `json:"-"`
Uuid string
First_ts time.Time
Last_ts time.Time
Ovflw_ts time.Time
Total_count int
Factory *BucketFactory
Duration time.Duration
Pour func(*Leaky, pourGate, pipeline.Event) `json:"-"`
// contains filtered or unexported fields
}
Leaky represents one instance of a bucket
func LoadOrStoreBucketFromHolder ¶ added in v1.2.3
func LoadOrStoreBucketFromHolder( ctx context.Context, partitionKey string, buckets *BucketStore, holder *BucketFactory, expectMode int, ) (*Leaky, error)
func NewLeakyFromFactory ¶
func NewLeakyFromFactory(f *BucketFactory) *Leaky
NewLeakyFromFactory creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction
func NewTimeMachine ¶
func NewTimeMachine(f *BucketFactory) *Leaky
func (*Leaky) LeakRoutine ¶
for now mimic a leak routine LeakRoutine is the life of a bucket. It dies when the bucket underflows or overflows
type LeakyType ¶
type LeakyType struct{}
func (LeakyType) BuildProcessors ¶
func (LeakyType) BuildProcessors(_ *BucketFactory) []Processor
func (LeakyType) Validate ¶
func (LeakyType) Validate(f *BucketFactory) error
type OverflowProcessor ¶
type OverflowProcessor struct {
Filter string
FilterRuntime *vm.Program
DumbProcessor
}
func NewOverflowProcessor ¶
func NewOverflowProcessor(f *BucketFactory) (*OverflowProcessor, error)
func (*OverflowProcessor) OnBucketOverflow ¶
func (u *OverflowProcessor) OnBucketOverflow(f *BucketFactory, l *Leaky, s pipeline.RuntimeAlert, q *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
type PourCollector ¶ added in v1.7.5
type PourCollector struct {
// contains filtered or unexported fields
}
func NewPourCollector ¶ added in v1.7.5
func NewPourCollector() *PourCollector
func (*PourCollector) Add ¶ added in v1.7.5
func (c *PourCollector) Add(key string, evt pipeline.Event)
func (*PourCollector) DumpYAML ¶ added in v1.7.5
func (c *PourCollector) DumpYAML() ([]byte, error)
type Processor ¶
type Processor interface {
OnBucketInit(f *BucketFactory) error
OnBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
OnBucketOverflow(f *BucketFactory, leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
AfterBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
}
type RawBayesianCondition ¶ added in v1.5.3
type ScopeType ¶ added in v1.7.4
type ScopeType struct {
Scope string `yaml:"type"`
Filter string `yaml:"expression"`
RunTimeFilter *vm.Program
}
func (*ScopeType) CompileFilter ¶
type SimulationChecker ¶
type TriggerProcessor ¶
type TriggerProcessor struct {
DumbProcessor
}
func (*TriggerProcessor) OnBucketPour ¶
func (*TriggerProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event
type TriggerType ¶
type TriggerType struct{}
func (TriggerType) BuildProcessors ¶
func (TriggerType) BuildProcessors(_ *BucketFactory) []Processor
func (TriggerType) Validate ¶
func (TriggerType) Validate(f *BucketFactory) error
type UniqProcessor ¶
type UniqProcessor struct {
DistinctCompiled *vm.Program
KeyCache map[string]bool
CacheMutex sync.Mutex
}
func (*UniqProcessor) AfterBucketPour ¶
func (*UniqProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event
func (*UniqProcessor) OnBucketInit ¶
func (p *UniqProcessor) OnBucketInit(f *BucketFactory) error
func (*UniqProcessor) OnBucketOverflow ¶
func (*UniqProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
func (*UniqProcessor) OnBucketPour ¶
func (p *UniqProcessor) OnBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event