pool

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package pool provides common pool patterns for concurrency control.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FixedPool

type FixedPool struct {
	// contains filtered or unexported fields
}

FixedPool implements a fixed size blocking pool.

Example
type JobKey string
var JobKeyID = JobKey("job_id")

l := 1000 // limit to 1000 concurrent requests.
// create a new pool
pool, err := NewFixedPool(
	"protected_resource_pool",
	OrderingRandom,
	l,
	100,
	time.Millisecond*250,
	time.Millisecond*500,
	time.Millisecond*10,
	0,
	time.Second,
	limit.BuiltinLimitLogger{},
	nil,
)
if err != nil {
	panic(err)
}

wg := sync.WaitGroup{}
wg.Add(l * 3)
// spawn 3000 concurrent requests that would normally be too much load for the protected resource.
for i := 0; i <= l*3; i++ {
	go func(c int) {
		defer wg.Done()
		ctx := context.WithValue(context.Background(), JobKeyID, c)
		// this will block until timeout or token was acquired.
		listener, ok := pool.Acquire(ctx)
		if !ok {
			log.Printf("was not able to acquire lock for id=%d\n", c)
			return
		}
		log.Printf("acquired lock for id=%d\n", c)
		// do something...
		time.Sleep(time.Millisecond * 10)
		listener.OnSuccess()
		log.Printf("released lock for id=%d\n", c)
	}(i)
}

// wait for completion
wg.Wait()
log.Println("Finished")

func NewFixedPool

func NewFixedPool(
	name string,
	ordering Ordering,
	fixedLimit int,
	windowSize int,
	minWindowTime time.Duration,
	maxWindowTime time.Duration,
	minRTTThreshold time.Duration,
	maxBacklog int,
	timeout time.Duration,
	logger limit.Logger,
	metricRegistry core.MetricRegistry,
) (*FixedPool, error)

NewFixedPool creates a named fixed pool resource. You can use this to guard another resource from too many concurrent requests.

use < 0 values for defaults, but fixedLimit and name are required.

func (*FixedPool) Acquire

func (p *FixedPool) Acquire(ctx context.Context) (core.Listener, bool)

Acquire a token for the protected resource. This method will block until acquisition or the configured timeout has expired.

func (*FixedPool) Limit

func (p *FixedPool) Limit() int

Limit will return the configured limit

func (*FixedPool) Ordering added in v0.5.0

func (p *FixedPool) Ordering() Ordering

Ordering the ordering strategy configured for this pool.

type Ordering added in v0.5.0

type Ordering int

Ordering define the pattern for ordering requests on Pool

const (
	OrderingRandom Ordering = iota
	OrderingFIFO
	OrderingLIFO
)

The available options

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool implements a generic blocking pool pattern.

Example
type JobKey string
var JobKeyID = JobKey("job_id")

l := 1000 // limit to adjustable 1000 concurrent requests.
delegateLimit := limit.NewDefaultAIMDLimit(
	"aimd_limiter",
	nil,
)
// wrap with a default limiter and simple strategy
// you could of course get very complicated with this.
delegateLimiter, err := limiter.NewDefaultLimiter(
	delegateLimit,
	(time.Millisecond * 250).Nanoseconds(),
	(time.Millisecond * 500).Nanoseconds(),
	(time.Millisecond * 10).Nanoseconds(),
	100,
	strategy.NewSimpleStrategy(l),
	limit.BuiltinLimitLogger{},
	nil,
)
if err != nil {
	panic(err)
}

// create a new pool
pool, err := NewPool(
	delegateLimiter,
	OrderingRandom,
	0,
	time.Second,
	limit.BuiltinLimitLogger{},
	nil,
)
if err != nil {
	panic(err)
}

wg := sync.WaitGroup{}
wg.Add(l * 3)
// spawn 3000 concurrent requests that would normally be too much load for the protected resource.
for i := 0; i <= l*3; i++ {
	go func(c int) {
		defer wg.Done()
		ctx := context.WithValue(context.Background(), JobKeyID, c)
		// this will block until timeout or token was acquired.
		listener, ok := pool.Acquire(ctx)
		if !ok {
			log.Printf("was not able to acquire lock for id=%d\n", c)
			return
		}
		log.Printf("acquired lock for id=%d\n", c)
		// do something...
		time.Sleep(time.Millisecond * 10)
		listener.OnSuccess()
		log.Printf("released lock for id=%d\n", c)
	}(i)
}

// wait for completion
wg.Wait()
log.Println("Finished")

func NewPool

func NewPool(
	delegateLimiter core.Limiter,
	ordering Ordering,
	maxBacklog int,
	timeout time.Duration,
	logger limit.Logger,
	metricRegistry core.MetricRegistry,
) (*Pool, error)

NewPool creates a pool resource. You can use this to guard another resource from too many concurrent requests.

use < 0 values for defaults, but delegateLimiter and name are required.

func (*Pool) Acquire

func (p *Pool) Acquire(ctx context.Context) (core.Listener, bool)

Acquire a token for the protected resource. This method will block until acquisition or the configured timeout has expired.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL