executor

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2022 License: MIT Imports: 6 Imported by: 0

README

executor

golang-ci

goroutine pool

1. Feature

  • supports cancel s single task or cancel all tasks in the goroutine pool.
Future.Cancel()
ExecutorService.Cancel()
  • Multiple types of goroutine pools can be created(SingleGPool|FixedGPool|DynamicGPool).

2. Multiple types of goroutine pools

category explain remark
SingleGPool A single worker goroutine pool
FixedGPool Fixed number of worker goroutine pools
DynamicGPool A goroutine pool where the number of workers can be dynamically changed min: Minimum number of workers
max:Maximum number of coroutines
2.1 SingleGPool
NewSingleGPool(ctx context.Context, opts ...option) ExecutorService
2.2 FixedGPool
NewFixedGPool(ctx context.Context, size int, opts ...option) ExecutorService
2.3 DynamicGPool
NewDynamicGPool(ctx context.Context, min int, max int, opts ...dynamicOption) ExecutorService
2.3.1 Expansion rules

If the task queue is full, try to add workers to execute the task.

2.3.2 Shrinking rules:
  • Condition: If the number of workers in a busy state is less than 1/4 of the total number of workers, the condition is considered satisfied

  • Perform meetCondNum consecutive checks, each with a detectInterval interval. If the conditions are met every time, the scaling is triggered.

  • The scaling action tries to reduce the number of workers by half

  • Condition: If the number of workers in a busy state is less than 1/4 of the total number of workers,try to reduce the number of workers by 1/2.

  • Execute meetCondNum consecutive checks, with detectInterval every time, and perform shrinking if the conditions are met each time.

3. Notice

Since the executor uses the channel as the task queue, blocking may occur when submitting tasks.

Submit(task Callable) (Future, error)

If the goroutine pool is running in the background for a long time, we strongly recommend monitoring the usage of the task queue.

TaskQueueCap() int
TaskQueueLength() int

4. Example

more examples

package main

import (
	"context"
	"fmt"
	"github.com/vearne/executor"
	"time"
)

type MyCallable struct {
	param int
}

func (m *MyCallable) Call(ctx context.Context) *executor.GPResult {
	time.Sleep(3 * time.Second)
	r := executor.GPResult{}
	r.Value = m.param * m.param
	r.Err = nil
	return &r
}

func main() {
	//pool := executor.NewFixedGPool(context.Background(), 10)
	/*
	   options:
	   executor.WithTaskQueueCap() : set capacity of task queue
	*/
	pool := executor.NewFixedGPool(context.Background(), 10, executor.WithTaskQueueCap(10))
	futureList := make([]executor.Future, 0)
	var f executor.Future
	var err error
	go func() {
		for i := 0; i < 1000; i++ {
			task := &MyCallable{param: i}
			f, err = pool.Submit(task)
			if err == nil {
				fmt.Println("add task", i)
				futureList = append(futureList, f)
			}
		}
	}()

	time.Sleep(10 * time.Second)
	pool.Shutdown()
	var result *executor.GPResult
	for _, f := range futureList {
		result = f.Get()
		fmt.Println(result.Err, result.Value)
	}
	pool.WaitTerminate()
}

5. debug

set log level optional value: debug | info | warn | error

export SIMPLE_LOG_LEVEL=debug

6. Thanks

Inspired by Java Executors Executors

Documentation

Index

Constants

View Source
const (
	SIZE int = 50
)

Variables

View Source
var (
	ErrTaskCanceled = errors.New("task has been canceled")
	ErrPoolShutdown = errors.New("pool has been shutdown")
)

Functions

func WithDetectInterval

func WithDetectInterval(detectInterval time.Duration) dynamicOption

func WithDynamicTaskQueueCap

func WithDynamicTaskQueueCap(taskQueueCap int) dynamicOption

Optional parameters

func WithMeetCondNum

func WithMeetCondNum(meetCondNum int) dynamicOption

func WithTaskQueueCap

func WithTaskQueueCap(taskQueueCap int) option

Optional parameters

Types

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

atomicBool is a wrapper around uint32 for usage as a boolean value with atomic access.

func NewAtomicBool

func NewAtomicBool(flag bool) *AtomicBool

func (*AtomicBool) IsSet

func (ab *AtomicBool) IsSet() bool

IsSet returns whether the current boolean value is true

func (*AtomicBool) IsTrue

func (ab *AtomicBool) IsTrue() bool

func (*AtomicBool) Set

func (ab *AtomicBool) Set(value bool)

Set sets the value of the bool regardless of the previous value

type Callable

type Callable interface {
	Call(ctx context.Context) *GPResult
}

type DynamicGPool

type DynamicGPool struct {

	// task queue
	TaskChan chan *FutureTask
	// contains filtered or unexported fields
}

nolint: govet

func (*DynamicGPool) Cancel

func (p *DynamicGPool) Cancel() bool

func (*DynamicGPool) CurrentGCount

func (p *DynamicGPool) CurrentGCount() int

func (*DynamicGPool) IsShutdown

func (p *DynamicGPool) IsShutdown() bool

func (*DynamicGPool) Shutdown

func (p *DynamicGPool) Shutdown()

New tasks may be added even after shutdown

func (*DynamicGPool) Submit

func (p *DynamicGPool) Submit(task Callable) (Future, error)

When submitting tasks, blocking may occur

func (*DynamicGPool) TaskQueueCap

func (p *DynamicGPool) TaskQueueCap() int

func (*DynamicGPool) TaskQueueLength

func (p *DynamicGPool) TaskQueueLength() int

func (*DynamicGPool) WaitTerminate

func (p *DynamicGPool) WaitTerminate()

type DynamicGPoolOption

type DynamicGPoolOption struct {
	// contains filtered or unexported fields
}

type ExecutorService

type ExecutorService interface {
	// no longer accept new tasks
	Shutdown()
	Submit(task Callable) (Future, error)
	IsShutdown() bool
	// Wait for all the tasks to be completed
	WaitTerminate()
	TaskQueueCap() int
	TaskQueueLength() int
	Cancel() bool
	CurrentGCount() int
}

func NewDynamicGPool

func NewDynamicGPool(ctx context.Context, min int, max int, opts ...dynamicOption) ExecutorService

func NewFixedGPool

func NewFixedGPool(ctx context.Context, size int, opts ...option) ExecutorService

func NewSingleGPool

func NewSingleGPool(ctx context.Context, opts ...option) ExecutorService

type FixedGPool

type FixedGPool struct {
	Size int
	// task queue
	TaskChan chan *FutureTask
	// contains filtered or unexported fields
}

nolint: govet

func (*FixedGPool) Cancel

func (p *FixedGPool) Cancel() bool

func (*FixedGPool) Consume

func (p *FixedGPool) Consume()

func (*FixedGPool) CurrentGCount

func (p *FixedGPool) CurrentGCount() int

func (*FixedGPool) IsShutdown

func (p *FixedGPool) IsShutdown() bool

func (*FixedGPool) Shutdown

func (p *FixedGPool) Shutdown()

New tasks may be added even after shutdown

func (*FixedGPool) Submit

func (p *FixedGPool) Submit(task Callable) (Future, error)

When submitting tasks, blocking may occur

func (*FixedGPool) TaskQueueCap

func (p *FixedGPool) TaskQueueCap() int

func (*FixedGPool) TaskQueueLength

func (p *FixedGPool) TaskQueueLength() int

func (*FixedGPool) WaitTerminate

func (p *FixedGPool) WaitTerminate()

type FixedGPoolOption

type FixedGPoolOption struct {
	// contains filtered or unexported fields
}

type Future

type Future interface {
	Get() *GPResult
	IsCancelled() bool
	Cancel() bool
	IsDone() bool
}

type FutureTask

type FutureTask struct {
	// contains filtered or unexported fields
}

func NewFutureTask

func NewFutureTask(ctx context.Context, c Callable) *FutureTask

func (*FutureTask) Cancel

func (f *FutureTask) Cancel() bool

func (*FutureTask) Get

func (f *FutureTask) Get() *GPResult

func (*FutureTask) IsCancelled

func (f *FutureTask) IsCancelled() bool

func (*FutureTask) IsDone

func (f *FutureTask) IsDone() bool

type GPResult

type GPResult struct {
	Value any
	Err   error
}

type Runnable

type Runnable interface {
	Run(ctx context.Context)
}

type ShrinkWorker

type ShrinkWorker struct {
	RunningFlag *AtomicBool
	ExitedFlag  chan struct{}
	ExitChan    chan struct{}
	// contains filtered or unexported fields
}

func NewShrinkWorker

func NewShrinkWorker(pool *DynamicGPool, interval time.Duration, meetCondNum int) *ShrinkWorker

func (*ShrinkWorker) Start

func (w *ShrinkWorker) Start()

Shrinking rules:

Condition: If the number of workers in a busy state is less than 1/4 of the total number of workers,
try to reduce the number of workers by 1/2. Execute meetCondNum consecutive checks,
with detectInterval every time, and perform shrinking if the conditions are met each time.

func (*ShrinkWorker) Stop

func (worker *ShrinkWorker) Stop()

type Worker

type Worker struct {
	RunningFlag *AtomicBool
	ExitedFlag  chan struct{}
	ExitChan    chan struct{}
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(pool *DynamicGPool) *Worker

func (*Worker) IsBusy

func (worker *Worker) IsBusy() bool

func (*Worker) Start

func (worker *Worker) Start()

func (*Worker) Stop

func (worker *Worker) Stop()

Directories

Path Synopsis
example
background command
dynamic command
dynamic2 command
fixed command
single command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL