limiters

package module
v0.0.0-...-4924559 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: MIT Imports: 11 Imported by: 0

README

Limiters

This is a fork of mennanov/limiters.

It has been created using go-fsck extract/restore, to group 1 struct per file.

Changes:

  • Removed Concurrent Buffer and related registry,
  • Removed all etcd, consul, dynamodb implementations,
  • Added an errorsWrap function to use the standar lib errors pkg,
  • Updated code to use stdlib errors over github.com/pkg/errors,
  • Removed SystemClock.Sleep,
  • Added per-key local storage constructors
  • Used redis.UniversalClient instead of *redis.Client to support cluster
  • Adjusted RedisLock to use Lock/UnlockContext functions from redsync
  • Wrote test for RedisLock

Ultimately the fork contains redis and local implementations for:

  • Token bucket
  • Leaky bucket
  • Fixed window counter
  • Sliding window counter

It only imports the redis and redsync dependencies. These are used to implement the distributed rate limiters, and provide a distributed locking mechanism for the rate limiters that require it.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLimitExhausted is returned by the Limiter in case the number of requests overflows the capacity of a Limiter.
	ErrLimitExhausted = errors.New("requests limit exhausted")

	// ErrRaceCondition is returned when there is a race condition while saving a state of a rate limiter.
	ErrRaceCondition = errors.New("race condition detected")
)

Functions

This section is empty.

Types

type Clock

type Clock interface {
	// Now returns the current system time.
	Now() time.Time
}

Clock encapsulates a system Clock. Used

type ConcurrentBufferBackend

type ConcurrentBufferBackend interface {
	// Add adds the request with the given key to the buffer and returns the total number of requests in it.
	Add(ctx context.Context, key string) (int64, error)
	// Remove removes the request from the buffer.
	Remove(ctx context.Context, key string) error
}

ConcurrentBufferBackend wraps the Add and Remove methods.

type DistLocker

type DistLocker interface {
	// Lock locks the locker.
	Lock(ctx context.Context) error
	// Unlock unlocks the previously successfully locked lock.
	Unlock(ctx context.Context) error
}

DistLocker is a context aware distributed locker (interface is similar to sync.Locker).

type FixedWindow

type FixedWindow struct {
	// contains filtered or unexported fields
}

FixedWindow implements a Fixed Window rate limiting algorithm.

Simple and memory efficient algorithm that does not need a distributed lock. However it may be lenient when there are many requests around the boundary between 2 adjacent windows.

func NewFixedWindow

func NewFixedWindow(capacity int64, rate time.Duration, fixedWindowIncrementer FixedWindowIncrementer, clock Clock) *FixedWindow

NewFixedWindow creates a new instance of FixedWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size.

func (*FixedWindow) Limit

func (f *FixedWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the window's capacity.

type FixedWindowInMemory

type FixedWindowInMemory struct {
	// contains filtered or unexported fields
}

FixedWindowInMemory is an in-memory implementation of FixedWindowIncrementer.

func LocalFixedWindow

func LocalFixedWindow(key string) *FixedWindowInMemory

func NewFixedWindowInMemory

func NewFixedWindowInMemory() *FixedWindowInMemory

NewFixedWindowInMemory creates a new instance of FixedWindowInMemory.

func (*FixedWindowInMemory) Increment

func (f *FixedWindowInMemory) Increment(ctx context.Context, window time.Time, _ time.Duration) (int64, error)

Increment increments the window's counter.

type FixedWindowIncrementer

type FixedWindowIncrementer interface {
	// Increment increments the request counter for the window and returns the counter value.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)
}

FixedWindowIncrementer wraps the Increment method.

type FixedWindowRedis

type FixedWindowRedis struct {
	// contains filtered or unexported fields
}

FixedWindowRedis implements FixedWindow in Redis.

func NewFixedWindowRedis

func NewFixedWindowRedis(cli redis.UniversalClient, prefix string) *FixedWindowRedis

NewFixedWindowRedis returns a new instance of FixedWindowRedis. Prefix is the key prefix used to store all the keys used in this implementation in Redis.

func (*FixedWindowRedis) Increment

func (f *FixedWindowRedis) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error)

Increment increments the window's counter in Redis.

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

LeakyBucket implements the https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue algorithm.

func NewLeakyBucket

func NewLeakyBucket(capacity int64, rate time.Duration, locker DistLocker, leakyBucketStateBackend LeakyBucketStateBackend, clock Clock, logger Logger) *LeakyBucket

NewLeakyBucket creates a new instance of LeakyBucket.

func (*LeakyBucket) Limit

func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. If the last request happened earlier than the rate this method returns zero duration. It returns ErrLimitExhausted if the the request overflows the bucket's capacity. In this case the returned duration means how long it would have taken to wait for the request to be processed if the bucket was not overflowed.

type LeakyBucketInMemory

type LeakyBucketInMemory struct {
	// contains filtered or unexported fields
}

LeakyBucketInMemory is an in-memory implementation of LeakyBucketStateBackend.

func LocalLeakyBucket

func LocalLeakyBucket(key string) *LeakyBucketInMemory

func NewLeakyBucketInMemory

func NewLeakyBucketInMemory() *LeakyBucketInMemory

NewLeakyBucketInMemory creates a new instance of LeakyBucketInMemory.

func (*LeakyBucketInMemory) SetState

func (l *LeakyBucketInMemory) SetState(ctx context.Context, state LeakyBucketState) error

SetState sets the current state of the bucket.

func (*LeakyBucketInMemory) State

State gets the current state of the bucket.

type LeakyBucketRedis

type LeakyBucketRedis struct {
	// contains filtered or unexported fields
}

LeakyBucketRedis is a Redis implementation of a LeakyBucketStateBackend.

func NewLeakyBucketRedis

func NewLeakyBucketRedis(cli redis.UniversalClient, prefix string, ttl time.Duration, raceCheck bool) *LeakyBucketRedis

NewLeakyBucketRedis creates a new LeakyBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned.

func (*LeakyBucketRedis) SetState

func (t *LeakyBucketRedis) SetState(ctx context.Context, state LeakyBucketState) error

SetState updates the state in Redis. The provided fencing token is checked on the Redis side before saving the keys.

func (*LeakyBucketRedis) State

State gets the bucket's state from Redis.

type LeakyBucketState

type LeakyBucketState struct {
	// Last is the Unix timestamp in nanoseconds of the most recent request.
	Last int64
}

LeakyBucketState represents the state of a LeakyBucket.

func (LeakyBucketState) IzZero

func (s LeakyBucketState) IzZero() bool

IzZero returns true if the bucket state is zero valued.

type LeakyBucketStateBackend

type LeakyBucketStateBackend interface {
	// State gets the current state of the LeakyBucket.
	State(ctx context.Context) (LeakyBucketState, error)
	// SetState sets (persists) the current state of the LeakyBucket.
	SetState(ctx context.Context, state LeakyBucketState) error
}

LeakyBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a LeakyBucket.

type LockNoop

type LockNoop struct {
}

LockNoop is a no-op implementation of the DistLocker interface. It should only be used with the in-memory backends as they are already thread-safe and don't need distributed locks.

func NewLockNoop

func NewLockNoop() *LockNoop

NewLockNoop creates a new LockNoop.

func (LockNoop) Lock

func (n LockNoop) Lock(ctx context.Context) error

Lock imitates locking.

func (LockNoop) Unlock

func (n LockNoop) Unlock(_ context.Context) error

Unlock does nothing.

type LockRedis

type LockRedis struct {
	// contains filtered or unexported fields
}

LockRedis is a wrapper around github.com/go-redsync/redsync that implements the DistLocker interface.

func NewLockRedis

func NewLockRedis(pool redsyncredis.Pool, mutexName string) *LockRedis

NewLockRedis creates a new instance of LockRedis.

func (*LockRedis) Lock

func (l *LockRedis) Lock(ctx context.Context) error

Lock locks the lock in Redis.

func (*LockRedis) Unlock

func (l *LockRedis) Unlock(ctx context.Context) error

Unlock unlocks the lock in Redis.

type Logger

type Logger interface {
	// Log logs the given arguments.
	Log(v ...interface{})
}

Logger wraps the Log method for logging.

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow implements a Sliding Window rate limiting algorithm.

It does not require a distributed lock and uses a minimum amount of memory, however it will disallow all the requests in case when a client is flooding the service with requests. It's the client's responsibility to handle the disallowed request and wait before making a new request again.

func NewSlidingWindow

func NewSlidingWindow(capacity int64, rate time.Duration, slidingWindowIncrementer SlidingWindowIncrementer, clock Clock, epsilon float64) *SlidingWindow

NewSlidingWindow creates a new instance of SlidingWindow. Capacity is the maximum amount of requests allowed per window. Rate is the window size. Epsilon is the max-allowed range of difference when comparing the current weighted number of requests with capacity.

func (*SlidingWindow) Limit

func (s *SlidingWindow) Limit(ctx context.Context) (time.Duration, error)

Limit returns the time duration to wait before the request can be processed. It returns ErrLimitExhausted if the request overflows the capacity.

type SlidingWindowInMemory

type SlidingWindowInMemory struct {
	// contains filtered or unexported fields
}

SlidingWindowInMemory is an in-memory implementation of SlidingWindowIncrementer.

func LocalSlidingWindow

func LocalSlidingWindow(key string) *SlidingWindowInMemory

func NewSlidingWindowInMemory

func NewSlidingWindowInMemory() *SlidingWindowInMemory

NewSlidingWindowInMemory creates a new instance of SlidingWindowInMemory.

func (*SlidingWindowInMemory) Increment

func (s *SlidingWindowInMemory) Increment(ctx context.Context, prev, curr time.Time, _ time.Duration) (int64, int64, error)

Increment increments the current window's counter and returns the number of requests in the previous window and the current one.

type SlidingWindowIncrementer

type SlidingWindowIncrementer interface {
	// Increment increments the request counter for the current window and returns the counter values for the previous
	// window and the current one.
	// TTL is the time duration before the next window.
	Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (prevCount, currCount int64, err error)
}

SlidingWindowIncrementer wraps the Increment method.

type SlidingWindowRedis

type SlidingWindowRedis struct {
	// contains filtered or unexported fields
}

SlidingWindowRedis implements SlidingWindow in Redis.

func NewSlidingWindowRedis

func NewSlidingWindowRedis(cli redis.UniversalClient, prefix string) *SlidingWindowRedis

NewSlidingWindowRedis creates a new instance of SlidingWindowRedis.

func (*SlidingWindowRedis) Increment

func (s *SlidingWindowRedis) Increment(ctx context.Context, prev, curr time.Time, ttl time.Duration) (int64, int64, error)

Increment increments the current window's counter in Redis and returns the number of requests in the previous window and the current one.

type StdLogger

type StdLogger struct{}

StdLogger implements the Logger interface.

func NewStdLogger

func NewStdLogger() *StdLogger

NewStdLogger creates a new instance of StdLogger.

func (*StdLogger) Log

func (l *StdLogger) Log(v ...interface{})

Log delegates the logging to the std logger.

type SystemClock

type SystemClock struct {
}

SystemClock implements the Clock interface by using the real system clock.

func NewSystemClock

func NewSystemClock() *SystemClock

NewSystemClock creates a new instance of SystemClock.

func (*SystemClock) Now

func (c *SystemClock) Now() time.Time

Now returns the current system time.

type TokenBucket

type TokenBucket struct {
	// contains filtered or unexported fields
}

TokenBucket implements the https://en.wikipedia.org/wiki/Token_bucket algorithm.

func NewTokenBucket

func NewTokenBucket(capacity int64, refillRate time.Duration, locker DistLocker, tokenBucketStateBackend TokenBucketStateBackend, clock Clock, logger Logger) *TokenBucket

NewTokenBucket creates a new instance of TokenBucket.

func (*TokenBucket) Limit

func (t *TokenBucket) Limit(ctx context.Context) (time.Duration, error)

Limit takes 1 token from the bucket.

func (*TokenBucket) Take

func (t *TokenBucket) Take(ctx context.Context, tokens int64) (time.Duration, error)

Take takes tokens from the bucket.

It returns a zero duration and a nil error if the bucket has sufficient amount of tokens.

It returns ErrLimitExhausted if the amount of available tokens is less than requested. In this case the returned duration is the amount of time to wait to retry the request.

type TokenBucketInMemory

type TokenBucketInMemory struct {
	// contains filtered or unexported fields
}

TokenBucketInMemory is an in-memory implementation of TokenBucketStateBackend.

The state is not shared nor persisted so it won't survive restarts or failures. Due to the local nature of the state the rate at which some endpoints are accessed can't be reliably predicted or limited.

Although it can be used as a global rate limiter with a round-robin load-balancer.

func LocalTokenBucket

func LocalTokenBucket(key string) *TokenBucketInMemory

func NewTokenBucketInMemory

func NewTokenBucketInMemory() *TokenBucketInMemory

NewTokenBucketInMemory creates a new instance of TokenBucketInMemory.

func (*TokenBucketInMemory) SetState

func (t *TokenBucketInMemory) SetState(ctx context.Context, state TokenBucketState) error

SetState sets the current bucket's state.

func (*TokenBucketInMemory) State

State returns the current bucket's state.

type TokenBucketRedis

type TokenBucketRedis struct {
	// contains filtered or unexported fields
}

TokenBucketRedis is a Redis implementation of a TokenBucketStateBackend.

Redis is an in-memory key-value data storage which also supports persistence. It is a better choice for high load cases than etcd as it does not keep old values of the keys thus does not need the compaction/defragmentation.

Although depending on a persistence and a cluster configuration some data might be lost in case of a failure resulting in an under-limiting the accesses to the service.

func NewTokenBucketRedis

func NewTokenBucketRedis(cli redis.UniversalClient, prefix string, ttl time.Duration, raceCheck bool) *TokenBucketRedis

NewTokenBucketRedis creates a new TokenBucketRedis instance. Prefix is the key prefix used to store all the keys used in this implementation in Redis. TTL is the TTL of the stored keys.

If raceCheck is true and the keys in Redis are modified in between State() and SetState() calls then ErrRaceCondition is returned. This adds an extra overhead since a Lua script has to be executed on the Redis side which locks the entire database.

func (*TokenBucketRedis) SetState

func (t *TokenBucketRedis) SetState(ctx context.Context, state TokenBucketState) error

SetState updates the state in Redis.

func (*TokenBucketRedis) State

State gets the bucket's state from Redis.

type TokenBucketState

type TokenBucketState struct {
	// Last is the last time the state was updated (Unix timestamp in nanoseconds).
	Last int64
	// Available is the number of available tokens in the bucket.
	Available int64
}

TokenBucketState represents a state of a token bucket.

type TokenBucketStateBackend

type TokenBucketStateBackend interface {
	// State gets the current state of the TokenBucket.
	State(ctx context.Context) (TokenBucketState, error)
	// SetState sets (persists) the current state of the TokenBucket.
	SetState(ctx context.Context, state TokenBucketState) error
}

TokenBucketStateBackend interface encapsulates the logic of retrieving and persisting the state of a TokenBucket.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL