leakybucket

package
v1.7.7-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 37 Imported by: 1

README

Leakybuckets

Concepts

Leakybuckets are used for decision-making. Under certain conditions, enriched events are poured into buckets. When a bucket reaches its threshold, it emits a new event (an overflow) and is then destroyed.

There are several bucket types, and we welcome contributions of new ones.

A single bucket configuration typically creates many bucket instances, distinguished by a stackkey. Events with the same stackkey value are poured into the same instance.

The main purpose is to detect clients that exceed a given rate of attempts (SSH logins, HTTP auth failures, etc.). In practice, stackkey is often source_ip.

Standard leaky buckets

Default buckets have two main configuration options:

  • capacity: number of events the bucket can hold. When the capacity is reached and a new event is poured, the bucket overflows (emits an overflow event of type integer).

  • leakspeed: how long it takes for one event to leak out of the bucket. When an event leaks, it is removed from the bucket.

Trigger

A Trigger has capacity: 0. Any poured event causes an overflow.

Uniq

Uniq behaves like a standard bucket, except it enforces uniqueness: a filter extracts a property from each event, and only one occurrence of a given value is allowed. If the value is already present, the event is ignored.

Counter

A Counter has infinite capacity and infinite leakspeed (it never overflows and never leaks). Instead, it emits an event after a fixed duration.

Bayesian

A Bayesian bucket runs Bayesian inference instead of counting events. Each condition specifies likelihoods via prob_given_benign and prob_given_evil. The bucket evaluates events until the posterior exceeds bayesian_threshold (overflow) or until leakspeed expires.

Configuration

Common fields
  • type (required): one of "leaky", "trigger", "uniq", "counter".

  • name (required): tags events emitted by this bucket. Any value is accepted.

  • filter (required): expression evaluated to decide whether an event matches. Must return a boolean. Expression language: https://github.com/antonmedv/expr.

  • stackkey (required): selects the bucket instance to pour into. When a new stackkey value is seen, a new bucket instance is created.

  • on_overflow (optional): action when the bucket overflows. Currently: "ban,1h", "Reprocess", "Delete". Reprocess sends the emitted event back to the event pool to be matched again.

Standard bucket fields
  • capacity (currently required): size of the bucket. When an event is poured into a full bucket, it overflows.

  • leakspeed: duration parsed by time.ParseDuration: https://pkg.go.dev/time#ParseDuration. After each interval, an event leaks from the bucket.

Uniq fields
  • uniq_filter: expr (in the Expression language) that must return a string. All strings returned for a given bucket instance must be unique; events generating a string that has already been seen are ignored.
Trigger fields

capacity and leakspeed do not apply.

Counter fields
  • duration: how long the Counter exists before it emits its event and is destroyed. Parsed by time.ParseDuration. Counters are configured with infinite capacity (capacity: -1 for now) and infinite leakspeed.
Bayesian fields
  • bayesian_prior: the prior to start with
  • bayesian_threshold: the threshold for the posterior to trigger the overflow.
  • bayesian_conditions: list of Bayesian conditions with likelihoods

Bayesian Conditions are built from:

  • condition: expr that must evaluate to true/false.
  • prob_given_evil: likelihood the condition holds given the IP is malicious.
  • prob_given_benign: likelihood the condition holds given the IP is benign.
  • guillotine: if true, stop evaluating this condition after it becomes true once (useful for expensive conditions).

Examples

# ssh bruteforce
- type: leaky
  name: ssh_bruteforce
  filter: "Meta.log_type == 'ssh_failed-auth'"
  leakspeed: "10s"
  capacity: 5
  stackkey: "source_ip"
  on_overflow: ban,1h

# reporting of src_ip,dest_port seen
- type: counter
  name: counter
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  distinct: "Meta.source_ip + ':' + Meta.dest_port"
  duration: 5m
  capacity: -1

- type: trigger
  name: "New connection"
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  on_overflow: Reprocess

Implementation notes

The leaky-bucket pipeline is driven by runPour() in cmd/crowdsec/pour.go, which calls Leaky.PourItemToHolders().

Buckets are managed by a BucketStore, which owns creation and lookup. Each bucket is addressed by a deterministic key computed from the bucket’s configured filter and the event stackkey, so the same (filter, stackkey) pair always maps to the same bucket instance.

The default implementation is the standard leaky bucket. Each bucket runs its own goroutine (bucket.go) responsible for the bucket's lifecycle (processing poured events, timing/leaking, and cleanup).

The behavior of special buckets is wired via hooks during initialization in manager_load.go. The bucket goroutine invokes these hooks at the appropriate time, typically when events are poured and/or when the bucket overflows.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventsFromQueue added in v1.0.0

func EventsFromQueue(queue *pipeline.Queue) []*models.Event

EventsFromQueue iterates the queue to collect & prepare meta-datas from alert

func GarbageCollectBuckets

func GarbageCollectBuckets(deadline time.Time, bucketStore *BucketStore)

The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.

func NewAlert added in v1.0.0

func NewAlert(leaky *Leaky, queue *pipeline.Queue) (pipeline.RuntimeAlert, error)

NewAlert will generate a RuntimeAlert and its APIAlert(s) from a bucket that overflowed

func Pour

func Pour(l *Leaky, gate pourGate, msg pipeline.Event)

TODO: can't be method, a field has the same name

func PourItemToBucket added in v1.2.3

func PourItemToBucket(
	ctx context.Context,
	bucket *Leaky,
	holder *BucketFactory,
	bucketStore *BucketStore,
	parsed *pipeline.Event,
	collector *PourCollector,
) error

func PourItemToHolders

func PourItemToHolders(
	ctx context.Context,
	parsed pipeline.Event,
	holders []BucketFactory,
	buckets *BucketStore,
	collector *PourCollector,
) (bool, error)

func SourceFromEvent added in v1.0.0

func SourceFromEvent(evt pipeline.Event, leaky *Leaky) (map[string]models.Source, error)

SourceFromEvent extracts and formats a valid models.Source object from an Event

func TimeMachinePour

func TimeMachinePour(l *Leaky, _ pourGate, msg pipeline.Event)

Types

type BayesianEvent added in v1.5.3

type BayesianEvent struct {
	// contains filtered or unexported fields
}

type BayesianProcessor

type BayesianProcessor struct {
	DumbProcessor
	// contains filtered or unexported fields
}

func (*BayesianProcessor) AfterBucketPour

func (p *BayesianProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event

func (*BayesianProcessor) OnBucketInit

func (p *BayesianProcessor) OnBucketInit(f *BucketFactory) error

type BayesianType

type BayesianType struct{}

func (BayesianType) BuildProcessors

func (BayesianType) BuildProcessors(_ *BucketFactory) []Processor

func (BayesianType) Validate

func (BayesianType) Validate(f *BucketFactory) error

type BlackholeProcessor

type BlackholeProcessor struct {
	DumbProcessor
	// contains filtered or unexported fields
}

func NewBlackholeProcessor

func NewBlackholeProcessor(f *BucketFactory) (*BlackholeProcessor, error)

func (*BlackholeProcessor) OnBucketOverflow

func (p *BlackholeProcessor) OnBucketOverflow(
	_ *BucketFactory,
	leaky *Leaky,
	alert pipeline.RuntimeAlert,
	queue *pipeline.Queue,
) (pipeline.RuntimeAlert, *pipeline.Queue)

type BucketFactory

type BucketFactory struct {
	Spec BucketSpec

	BucketName     string
	Filename       string
	RunTimeFilter  *vm.Program `json:"-"`
	RunTimeGroupBy *vm.Program `json:"-"`
	DataDir        string

	Simulated bool // Set to true if the scenario instantiating the bucket was in the exclusion list
	// contains filtered or unexported fields
}

BucketFactory is the compiled/validated, reusable template produced from a BucketSpec.

func LoadBuckets

func LoadBuckets(
	cscfg *csconfig.CrowdsecServiceCfg,
	hub *cwhub.Hub,
	scenarios []*cwhub.Item,
	orderEvent bool,
) ([]BucketFactory, chan pipeline.Event, error)

func (*BucketFactory) BucketKey

func (f *BucketFactory) BucketKey(stackkey string) string

func (*BucketFactory) LoadBucket

func (f *BucketFactory) LoadBucket() error

LoadBucket validates and prepares a BucketFactory for runtime use (compile expressions, init processors, init data).

func (*BucketFactory) Validate

func (f *BucketFactory) Validate() error

type BucketSpec

type BucketSpec struct {
	FormatVersion       string                     `yaml:"format"`
	Description         string                     `yaml:"description"`
	References          []string                   `yaml:"references"`
	Type                string                     `yaml:"type"`                // Type can be : leaky, counter, trigger. It determines the main bucket characteristics
	Name                string                     `yaml:"name"`                // Name of the bucket, used later in log and user-messages. Should be unique
	Capacity            int                        `yaml:"capacity"`            // Capacity is applicable to leaky buckets and determines the "burst" capacity
	LeakSpeed           string                     `yaml:"leakspeed"`           // Leakspeed is a float representing how many events per second leak out of the bucket
	Filter              string                     `yaml:"filter"`              // Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
	GroupBy             string                     `yaml:"groupby,omitempty"`   // groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
	Distinct            string                     `yaml:"distinct"`            // Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
	Debug               bool                       `yaml:"debug"`               // Debug, when set to true, will enable debugging for _this_ scenario specifically
	Labels              map[string]any             `yaml:"labels"`              // Labels is K:V list aiming at providing context the overflow
	Blackhole           string                     `yaml:"blackhole,omitempty"` // Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration
	ScopeType           ScopeType                  `yaml:"scope,omitempty"`     // to enforce a different remediation than blocking an IP. Will default this to IP
	Reprocess           bool                       `yaml:"reprocess"`           // Reprocess, if true, will for the bucket to be re-injected into processing chain
	Data                []*enrichment.DataProvider `yaml:"data,omitempty"`
	ConditionalOverflow string                     `yaml:"condition"`           // condition if present, is an expression that must return true for the bucket to overflow
	CacheSize           int                        `yaml:"cache_size"`          // CacheSize, if > 0, limits the size of in-memory cache of the bucket
	CancelOnFilter      string                     `yaml:"cancel_on,omitempty"` // a filter that, if matched, kills the bucket
	BayesianPrior       float32                    `yaml:"bayesian_prior"`
	BayesianThreshold   float32                    `yaml:"bayesian_threshold"`
	BayesianConditions  []RawBayesianCondition     `yaml:"bayesian_conditions"` // conditions for the bayesian bucket
	OverflowFilter      string                     `yaml:"overflow_filter"`     // OverflowFilter if present, is a filter that must return true for the overflow to go through
	Duration            string                     `yaml:"duration"`            // Duration allows 'counter' buckets to have a fixed life-time
	ScenarioVersion     string                     `yaml:"version,omitempty"`
}

BucketSpec is the declarative YAML config for a scenario bucket. It holds all the possible user-provided fields.

type BucketStore

type BucketStore struct {
	// contains filtered or unexported fields
}

BucketStore is the struct used to hold buckets during the lifecycle of the app (i.e. between reloads).

func NewBucketStore

func NewBucketStore() *BucketStore

func (*BucketStore) BeginPour

func (b *BucketStore) BeginPour() (end func())

BeginPour blocks while a dump/snapshot is in progress.

The returned function *must* be called exactly once, usually deferred, after the event has been poured.

func (*BucketStore) Delete

func (b *BucketStore) Delete(key string)

func (*BucketStore) FreezePours

func (b *BucketStore) FreezePours() (resume func())

FreezePours prevents new pours to start and waits for in-flight pours to finish.

The returned function *must* be called exactly once, usually deferred, to allow pouring again.

func (*BucketStore) Len

func (b *BucketStore) Len() int

func (*BucketStore) Load

func (b *BucketStore) Load(key string) (*Leaky, bool)

func (*BucketStore) LoadOrStore

func (b *BucketStore) LoadOrStore(key string, val *Leaky) (*Leaky, bool)

func (*BucketStore) Snapshot

func (b *BucketStore) Snapshot() map[string]*Leaky

type BucketType

type BucketType interface {
	Validate(f *BucketFactory) error
	BuildProcessors(f *BucketFactory) []Processor
}

type CancelProcessor

type CancelProcessor struct {
	CancelOnFilter *vm.Program
	Debug          bool
}

func (*CancelProcessor) AfterBucketPour

func (*CancelProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event

func (*CancelProcessor) OnBucketInit

func (p *CancelProcessor) OnBucketInit(f *BucketFactory) error

func (*CancelProcessor) OnBucketOverflow

func (*CancelProcessor) OnBucketPour

func (p *CancelProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event

type ConditionalProcessor

type ConditionalProcessor struct {
	ConditionalFilter        string
	ConditionalFilterRuntime *vm.Program
	DumbProcessor
}

func (*ConditionalProcessor) AfterBucketPour

func (p *ConditionalProcessor) AfterBucketPour(f *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event

func (*ConditionalProcessor) OnBucketInit

func (p *ConditionalProcessor) OnBucketInit(f *BucketFactory) error

type ConditionalType

type ConditionalType struct{}

func (ConditionalType) BuildProcessors

func (ConditionalType) BuildProcessors(_ *BucketFactory) []Processor

func (ConditionalType) Validate

func (ConditionalType) Validate(f *BucketFactory) error

type CounterType

type CounterType struct{}

func (CounterType) BuildProcessors

func (CounterType) BuildProcessors(_ *BucketFactory) []Processor

func (CounterType) Validate

func (CounterType) Validate(f *BucketFactory) error

type DumbProcessor

type DumbProcessor struct{}

func (*DumbProcessor) AfterBucketPour added in v1.5.0

func (*DumbProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event

func (*DumbProcessor) OnBucketInit

func (*DumbProcessor) OnBucketInit(_ *BucketFactory) error

func (*DumbProcessor) OnBucketOverflow

func (*DumbProcessor) OnBucketPour

func (*DumbProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event

type Leaky

type Leaky struct {
	Mode int // LIVE or TIMEMACHINE
	// the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
	Limiter         rate.RateLimiter `json:"-"`
	SerializedState rate.Lstate
	// Queue is used to hold the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
	Queue *pipeline.Queue
	// Leaky buckets are receiving message through a chan
	In chan *pipeline.Event `json:"-"`
	// Leaky buckets are pushing their overflows through a chan
	Out chan *pipeline.Queue `json:"-"`
	// shared for all buckets (the idea is to kill this afterward)
	AllOut chan pipeline.Event `json:"-"`
	// the unique identifier of the bucket (a hash)
	Mapkey string

	Suicide     chan bool `json:"-"`
	Uuid        string
	First_ts    time.Time
	Last_ts     time.Time
	Ovflw_ts    time.Time
	Total_count int
	Factory     *BucketFactory
	Duration    time.Duration
	Pour        func(*Leaky, pourGate, pipeline.Event) `json:"-"`
	// contains filtered or unexported fields
}

Leaky represents one instance of a bucket

func LoadOrStoreBucketFromHolder added in v1.2.3

func LoadOrStoreBucketFromHolder(
	ctx context.Context,
	partitionKey string,
	buckets *BucketStore,
	holder *BucketFactory,
	expectMode int,
) (*Leaky, error)

func NewLeakyFromFactory

func NewLeakyFromFactory(f *BucketFactory) *Leaky

NewLeakyFromFactory creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction

func NewTimeMachine

func NewTimeMachine(f *BucketFactory) *Leaky

func (*Leaky) LeakRoutine

func (l *Leaky) LeakRoutine(ctx context.Context, gate pourGate)

for now mimic a leak routine LeakRoutine is the life of a bucket. It dies when the bucket underflows or overflows

type LeakyType

type LeakyType struct{}

func (LeakyType) BuildProcessors

func (LeakyType) BuildProcessors(_ *BucketFactory) []Processor

func (LeakyType) Validate

func (LeakyType) Validate(f *BucketFactory) error

type OverflowProcessor

type OverflowProcessor struct {
	Filter        string
	FilterRuntime *vm.Program
	DumbProcessor
}

func NewOverflowProcessor

func NewOverflowProcessor(f *BucketFactory) (*OverflowProcessor, error)

func (*OverflowProcessor) OnBucketOverflow

type PourCollector added in v1.7.5

type PourCollector struct {
	// contains filtered or unexported fields
}

func NewPourCollector added in v1.7.5

func NewPourCollector() *PourCollector

func (*PourCollector) Add added in v1.7.5

func (c *PourCollector) Add(key string, evt pipeline.Event)

func (*PourCollector) DumpYAML added in v1.7.5

func (c *PourCollector) DumpYAML() ([]byte, error)

func (*PourCollector) Snapshot added in v1.7.5

func (c *PourCollector) Snapshot() map[string][]pipeline.Event

Snapshot returns a shallow copy of the map and slices. The caller must not mutate the events.

type Processor

type Processor interface {
	OnBucketInit(f *BucketFactory) error
	OnBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
	OnBucketOverflow(f *BucketFactory, leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)

	AfterBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
}

type RawBayesianCondition added in v1.5.3

type RawBayesianCondition struct {
	ConditionalFilterName string  `yaml:"condition"`
	ProbGivenEvil         float32 `yaml:"prob_given_evil"`
	ProbGivenBenign       float32 `yaml:"prob_given_benign"`
	Guillotine            bool    `yaml:"guillotine,omitempty"`
}

type ScopeType added in v1.7.4

type ScopeType struct {
	Scope         string `yaml:"type"`
	Filter        string `yaml:"expression"`
	RunTimeFilter *vm.Program
}

func (*ScopeType) CompileFilter

func (s *ScopeType) CompileFilter() error

type SimulationChecker

type SimulationChecker interface {
	IsSimulated(scenario string) bool
}

type TriggerProcessor

type TriggerProcessor struct {
	DumbProcessor
}

func (*TriggerProcessor) OnBucketPour

func (*TriggerProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event

type TriggerType

type TriggerType struct{}

func (TriggerType) BuildProcessors

func (TriggerType) BuildProcessors(_ *BucketFactory) []Processor

func (TriggerType) Validate

func (TriggerType) Validate(f *BucketFactory) error

type UniqProcessor

type UniqProcessor struct {
	DistinctCompiled *vm.Program
	KeyCache         map[string]bool
	CacheMutex       sync.Mutex
}

func (*UniqProcessor) AfterBucketPour

func (*UniqProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event

func (*UniqProcessor) OnBucketInit

func (p *UniqProcessor) OnBucketInit(f *BucketFactory) error

func (*UniqProcessor) OnBucketOverflow

func (*UniqProcessor) OnBucketPour

func (p *UniqProcessor) OnBucketPour(f *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL