Documentation
¶
Index ¶
- Constants
- Variables
- func LoadForRate(rate int32, maxrate *int32) (load int32)
- type Limiter
- type Ticker
- func (ticker *Ticker) Close()
- func (ticker *Ticker) Count() (n int64)
- func (ticker *Ticker) Drain() (drained int64)
- func (ticker *Ticker) IsClosed() (yes bool)
- func (ticker *Ticker) Load() (load int32)
- func (ticker *Ticker) MaxRate() (n int32)
- func (ticker *Ticker) Rate() (n int32)
- func (ticker *Ticker) Wait() (ok bool)
- func (ticker *Ticker) Worker(f func()) (ok bool)
- func (ticker *Ticker) WorkerCount() int32
Examples ¶
Constants ¶
const SleepGranularity = 500
SleepGranularity 500 implies that the time.Sleep() granularity is at least 2ms
Variables ¶
var TickerTimerInterval = time.Second
TickerTimerInterval is how often new Tickers update rate and load metrics. The value is snapped by NewTicker(); changing it only affects future Tickers. Do not change TickerTimerInterval while NewTicker() may run; set it before starting tickers.
Functions ¶
func LoadForRate ¶ added in v1.14.0
LoadForRate calculates the load given a rate and maxrate.
Types ¶
type Limiter ¶
type Limiter struct {
CloseCh <-chan struct{} // may be nil, if set can break Wait() before waiting is complete
// contains filtered or unexported fields
}
Limiter provides efficient rate limiting. The zero value is immediately usable.
A Limiter is not safe to use from multiple goroutines simultaneously.
func (*Limiter) Wait ¶
Wait sleeps at least long enough to ensure that Wait cannot be called more than `*maxrate` times per second.
A nil `maxrate` or a `*maxrate` of zero or less doesn't wait at all.
Example ¶
package main
import (
"fmt"
"time"
"github.com/linkdata/rate"
)
func main() {
var limiter rate.Limiter
maxrate := int32(1000)
now := time.Now()
// This doesn't wait at all since we haven't waited for anything yet.
limiter.Wait(&maxrate)
noneElapsed := time.Since(now)
// Instead of calling now = time.Now(), which can be slow, just add noneElapsed.
now = now.Add(noneElapsed)
// This waits at least 1ms because the maxrate is 1000.
limiter.Wait(&maxrate)
someElapsed := time.Since(now)
fmt.Println(noneElapsed < someElapsed, someElapsed >= time.Second/time.Duration(maxrate))
}
Output: true true
type Ticker ¶ added in v1.3.0
type Ticker struct {
C <-chan struct{} // Sends a struct{}{} at most maxrate times per second
// WorkerMax is the maximum number of workers that may be started with Worker(), default 10000.
// Do not change WorkerMax while Worker() may run; synchronize externally if you must update it at runtime.
WorkerMax int32
// WorkerLoad is the load at which we stop starting new workers, default 1000.
// Do not change WorkerLoad while Worker() may run; synchronize externally if you must update it at runtime.
WorkerLoad int32
// WorkerRatio is the ratio of max workers to max rate, default 1.
// Do not change WorkerRatio while Worker() may run; synchronize externally if you must update it at runtime.
WorkerRatio int32
// contains filtered or unexported fields
}
func NewTicker ¶
NewTicker returns a Ticker that reads ticks from a parent Ticker and sends a `struct{}{}` at most `*maxrate` times per second.
The effective max rate is thus the lower of the parent Tickers maxrate and this Tickers `*maxrate`.
A nil `parent` Ticker means tick rate is only limited by `maxrate`. If the parent Ticker is closed, this Ticker will stop sending ticks.
A nil `maxrate` or a `*maxrate` of zero or less sends as quickly as possible, so only limited by the parent channel.
func (*Ticker) Close ¶ added in v1.3.0
func (ticker *Ticker) Close()
Close stops the Ticker and frees resources.
It is safe to call multiple times or concurrently. Once Close() returns, no more ticks will be delivered, and if you passed a non-nil ticker counter to NewTicker(), it will be correct.
func (*Ticker) Drain ¶ added in v1.14.0
Drain consumes ticks from the Ticker until it is closed. Returns the number of ticks drained.
func (*Ticker) Load ¶ added in v1.8.0
Load returns the current load in permille.
Load is rounded up, and is only zero if the rate is zero. If the Ticker has parent Ticker(s), the highest load is returned. Closed Tickers have a load of 1000.
func (*Ticker) Rate ¶ added in v1.8.0
Rate returns an advisory estimate of the current rate of ticks per second.
The value is sampled periodically, so it may lag behind sudden rate changes and can momentarily exceed MaxRate().
func (*Ticker) Wait ¶ added in v1.5.0
Wait delays until the next tick is available, then adds a "free tick" back to the Ticker.
Returns true if we waited successfully, or false if the Ticker is closed.
func (*Ticker) Worker ¶ added in v1.13.0
Worker starts f when the current load allows. Returns true if the goroutine was started. Returns false if the ticker is no longer running.
It limits the number of goroutines started this way to MaxRate() * WorkerRatio, with a hard cap of WorkerMax. If MaxRate() * WorkerRatio is less than one, WorkerMax is used.
Example ¶
package main
import (
"fmt"
"sync/atomic"
"time"
"github.com/linkdata/rate"
)
func main() {
const numTasks = 100
const wantRate = numTasks / 10
var result int64
var workerCount int32
var highestWorkerCount int32
maxrate := int32(wantRate)
ticker := rate.NewTicker(nil, &maxrate)
// make a task channel and spawn a goroutine sending to it
taskCh := make(chan int)
go func() {
defer close(taskCh)
for i := range numTasks {
taskCh <- i
}
}()
// define a worker function that just adds and sleeps for a bit
workerFn := func(i int) {
for j := 0; j <= i; j++ {
time.Sleep(time.Millisecond)
atomic.AddInt64(&result, int64(j))
}
}
// process all the tasks
for task := range taskCh {
// make sure to not alias variables you use in the lambda
// in case you use Go versions prior to 1.22.
if !ticker.Worker(func() {
// let's keep track of the highest number of concurrent worker goroutines
defer atomic.AddInt32(&workerCount, -1)
if count := atomic.AddInt32(&workerCount, 1); count > atomic.LoadInt32(&highestWorkerCount) {
atomic.StoreInt32(&highestWorkerCount, count)
}
// call the worker.
workerFn(task)
}) {
// if ticker.Worker() fails to start the worker, it means the Ticker is closed.
break
}
}
// wait for the workers to be done
for ticker.WorkerCount() != 0 {
time.Sleep(time.Millisecond)
}
// calculate the expected result
var wantResult int64
for i := range int64(numTasks) {
for j := int64(0); j <= i; j++ {
wantResult += j
}
}
fmt.Println(result == wantResult, atomic.LoadInt32(&highestWorkerCount) < numTasks/2)
}
Output: true true
func (*Ticker) WorkerCount ¶ added in v1.14.1
WorkerCount returns the number of currently running workers.