Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var RejectWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { ctx, cancel := orerr.CancelWithError(ctx) select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil default: cancel(orerr.LimitExceededError{ Kind: "PoolQueue", }) return r.Run(ctx) } })
RejectWhenFull tries to schedule async.Runner for period when context is alive When underlying buffered channel is full then it cancels the context with orerr.LimitExceededError
var WaitWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil } })
WaitWhenFull tries to schedule async.Runner for period when context is alive It blocks When underlying buffered channel is full
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option interface {
Apply(*Options)
}
Option allows to functional options pattern to configure pool
type OptionFunc ¶
type OptionFunc func(*Options)
OptionFunc help to implement Option interface
func BufferLength ¶
func BufferLength(size int) OptionFunc
BufferLength helps to set BufferLength option
func ResizeEvery ¶
func ResizeEvery(d time.Duration) OptionFunc
ResizeEvery helps to set ResizeEvery option
func (OptionFunc) Apply ¶
func (of OptionFunc) Apply(opts *Options)
Apply implementation of Option interface
type Options ¶
type Options struct {
// Size allows to dynamically resolve number of workers that should spawned
Size SizeFunc
// ResizeEvery defined intervals when pool will be resized (shrank or grown)
ResizeEvery time.Duration
// ScheduleBehavior defines how exactly will Schedule method behave.
// The WaitWhenFull is used by default if no value is provided
ScheduleBehavior ScheduleBehavior
// BufferLength defines size of buffered channel queue
BufferLength int
// Pool name for logging reasons
Name string
}
A Options provides pool configuration
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool structure
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/getoutreach/gobox/pkg/async"
"github.com/getoutreach/gobox/pkg/async/pool"
)
func main() {
var (
concurrency = 5
items = 10
sleepFor = 5 * time.Millisecond
)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// Spawn pool of workers
p := pool.New(ctx,
pool.ConstantSize(concurrency),
pool.ResizeEvery(5*time.Minute),
pool.BufferLength(256),
pool.WaitWhenFull,
)
defer p.Close()
// Wrap it with timeout for schedule
scheduler := pool.WithTimeout(5*time.Millisecond, p)
// Lets wait for all scheduled items from this point
scheduler, wait := pool.WithWait(scheduler)
output := make(chan string, items)
now := time.Now()
for i := 0; i < items; i++ {
func(i int) {
// All input and output is captured by closure
scheduler.Schedule(ctx, async.Func(func(ctx context.Context) error {
// It is very important to check the context error:
// - Given context might be done
// - Underlying buffered channel is full
// - Pool is in shutdown phase
if ctx.Err() != nil {
return ctx.Err()
}
time.Sleep(sleepFor)
batchN := (time.Since(now) / (sleepFor))
output <- fmt.Sprintf("task_%d_%d", batchN, i)
// returned error is logged but not returned by Schedule function
return nil
}))
}(i)
}
wait()
close(output)
for s := range output {
fmt.Println(s)
}
// Not using unordered output since it not deterministic
// task_1_3
// task_1_4
// task_1_0
// task_1_1
// task_1_2
// task_2_6
// task_2_9
// task_2_5
// task_2_7
// task_2_8
}
func New ¶
New creates new instance of Pool and start goroutine that will spawn the workers Call Close() to release pool resource
func (*Pool) Close ¶
func (p *Pool) Close()
Close blocks until all workers finshes current items and terminates
func (*Pool) Schedule ¶
Schedule tries to schedule runner for processing in the pool It is required to check provided context for an error. The async.Runner interface will be called in all cases: - When item gets successfully scheduled and withdrawn by worker - When the given context is Done and item is not scheduled (Timeout, buffered queue full) - When pool is in shutdown phase.
type ScheduleBehavior ¶
ScheduleBehavior defines the behavior of pool Schedule method
func (ScheduleBehavior) Apply ¶
func (sb ScheduleBehavior) Apply(opts *Options)
Apply implementation of Option interface
type Scheduler ¶
type Scheduler interface {
// Schedule task for processing in the pool
Schedule(ctx context.Context, r async.Runner) error
}
func WithTimeout ¶
WithTimeout creates enqueuer that cancel enqueueing after given timeout
type SchedulerFunc ¶
type SizeFunc ¶
type SizeFunc func() int
SizeFunc tells the pool whether it should increase or decrease number of workers