Documentation
¶
Overview ¶
Package sync implements synchronization facililites such as worker pools.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type PooledWorkerPool ¶
type PooledWorkerPool interface {
// Init initializes the pool.
Init()
// Go assign the Work to be executed by a Goroutine. Whether or not
// it waits for an existing Goroutine to become available or not
// is determined by the GrowOnDemand() option. If GrowOnDemand is not
// set then the call to Go() will block until a goroutine is available.
// If GrowOnDemand() is set then it will expand the pool of goroutines to
// accommodate the work. The newly allocated goroutine will temporarily
// participate in the pool in an effort to amortize its allocation cost, but
// will eventually be killed. This allows the pool to dynamically respond to
// workloads without causing excessive memory pressure. The pool will grow in
// size when the workload exceeds its capacity and shrink back down to its
// original size if/when the burst subsides.
Go(work Work)
// GoWithTimeout waits up to the given timeout for a worker to become
// available, returning true if a worker becomes available, or false
// otherwise.
GoWithTimeout(work Work, timeout time.Duration) bool
}
PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.
In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.
The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.
func NewPooledWorkerPool ¶
func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)
NewPooledWorkerPool creates a new worker pool.
type PooledWorkerPoolOptions ¶
type PooledWorkerPoolOptions interface {
// SetGrowOnDemand sets whether the GrowOnDemand feature is enabled.
SetGrowOnDemand(value bool) PooledWorkerPoolOptions
// GrowOnDemand returns whether the GrowOnDemand feature is enabled.
GrowOnDemand() bool
// SetNumShards sets the number of worker channel shards.
SetNumShards(value int64) PooledWorkerPoolOptions
// NumShards returns the number of worker channel shards.
NumShards() int64
// SetKillWorkerProbability sets the probability to kill a worker.
SetKillWorkerProbability(value float64) PooledWorkerPoolOptions
// KillWorkerProbability returns the probability to kill a worker.
KillWorkerProbability() float64
// SetNowFn sets the now function.
SetNowFn(value NowFn) PooledWorkerPoolOptions
// NowFn returns the now function.
NowFn() NowFn
// SetInstrumentOptions sets the instrument options.
SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions
// InstrumentOptions returns the now function.
InstrumentOptions() instrument.Options
}
PooledWorkerPoolOptions is the options for a PooledWorkerPool.
func NewPooledWorkerPoolOptions ¶
func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions
NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options
type WorkerPool ¶
type WorkerPool interface {
// Init initializes the pool.
Init()
// Go waits until the next wbyorker becomes available and executes it.
Go(work Work)
// GoIfAvailable performs the work inside a worker if one is available and
// returns true, or false otherwise.
GoIfAvailable(work Work) bool
// GoWithTimeout waits up to the given timeout for a worker to become
// available, returning true if a worker becomes available, or false
// otherwise.
GoWithTimeout(work Work, timeout time.Duration) bool
}
WorkerPool provides a pool for goroutines.
Example ¶
package main
import (
"fmt"
"log"
"sync"
xsync "github.com/m3db/m3/src/x/sync"
)
type response struct {
a int
}
func main() {
var (
wg sync.WaitGroup
workers = xsync.NewWorkerPool(3)
errorCh = make(chan error, 1)
numRequests = 9
responses = make([]response, numRequests)
)
wg.Add(numRequests)
workers.Init()
for i := 0; i < numRequests; i++ {
// Capture loop variable.
i := i
// Execute request on worker pool.
workers.Go(func() {
defer wg.Done()
var err error
// Perform some work which may fail.
resp := response{a: i}
if err != nil {
// Return the first error that is encountered.
select {
case errorCh <- err:
default:
}
return
}
// Can concurrently modify responses since each iteration updates a
// different index.
responses[i] = resp
})
}
// Wait for all requests to finish.
wg.Wait()
close(errorCh)
if err := <-errorCh; err != nil {
log.Fatal(err)
}
var total int
for _, r := range responses {
total += r.a
}
fmt.Printf("Total is %v", total)
}
Output: Total is 36
func NewWorkerPool ¶
func NewWorkerPool(size int) WorkerPool
NewWorkerPool creates a new worker pool.