executor

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: MIT Imports: 6 Imported by: 0

README

executor

golang-ci

goroutine pool

1. 特性

  • 支持取消单个任务,也可以取消协程池上的所有任务
Future.Cancel()
ExecutorService.Cancel()

你也可以使用context.Context取消task或者pool。

  • 可以创建多种不同类型的协程池(SingleGPool|FixedGPool|DynamicGPool)

2. 多种类型的协程池

类别 说明 备注
SingleGPool 单个worker协程池
FixedGPool 固定数量worker协程池
DynamicGPool worker数量可以动态变化的协程池 min:最少协程数量
max:最大协程数量
2.1 SingleGPool
NewSingleGPool(ctx context.Context, opts ...option) ExecutorService
2.2 FixedGPool
NewFixedGPool(ctx context.Context, size int, opts ...option) ExecutorService
2.3 DynamicGPool
NewDynamicGPool(ctx context.Context, min int, max int, opts ...dynamicOption) ExecutorService
2.3.1 扩容规则

提交任务时,如果任务队列已经满了,则尝试增加worker去执行任务。

2.3.2 缩容规则:
  • 条件: 如果处于忙碌状态的worker少于worker总数的1/4,则认为满足条件
  • 执行meetCondNum次连续检测,每次间隔detectInterval。如果每次都满足条件,触发缩容。
  • 缩容动作尝试减少一半的worker

3. 注意

由于executor使用了channel作为作为任务队列,所以提交任务时,可能会发生阻塞。

Submit(task Callable) (Future, error)

如果协程池长期在后台执行,我们强烈建议监控任务队列的使用情况。

TaskQueueCap() int
TaskQueueLength() int

4. 示例

更多示例

package main

import (
	"context"
	"fmt"
	"github.com/vearne/executor"
	"time"
)

type MyCallable struct {
	param int
}

func (m *MyCallable) Call(ctx context.Context) *executor.GPResult {
	time.Sleep(1 * time.Second)
	r := executor.GPResult{}
	r.Value = m.param * m.param
	r.Err = nil
	return &r
}

func main() {
	//pool := executor.NewFixedGPool(context.Background(), 10)
	/*
	   options:
	   executor.WithTaskQueueCap() : set capacity of task queue
	*/
	pool := executor.NewFixedGPool(context.Background(), 10, executor.WithTaskQueueCap(10))
	futureList := make([]executor.Future, 0)
	var f executor.Future
	var err error

	for i := 0; i < 100; i++ {
		task := &MyCallable{param: i}
		f, err = pool.Submit(task)
		if err == nil {
			fmt.Println("add task", i)
			futureList = append(futureList, f)
		}
	}

	pool.Shutdown()
	var result *executor.GPResult
	for _, f := range futureList {
		result = f.Get()
		fmt.Println(result.Err, result.Value)
	}
	pool.WaitTerminate()
}

5. 调试

设置日志级别 可选值: debug | info | warn | error

export SIMPLE_LOG_LEVEL=debug

6. 感谢

本项目受到Java Executors的启发 Executors

Documentation

Index

Constants

View Source
const (
	SIZE int = 50
)

Variables

View Source
var (
	ErrTaskCanceled = errors.New("task has been canceled")
	ErrPoolShutdown = errors.New("pool has been shutdown")
)

Functions

func WithDetectInterval

func WithDetectInterval(detectInterval time.Duration) dynamicOption

func WithDynamicTaskQueueCap

func WithDynamicTaskQueueCap(taskQueueCap int) dynamicOption

Optional parameters

func WithMeetCondNum

func WithMeetCondNum(meetCondNum int) dynamicOption

func WithTaskQueueCap

func WithTaskQueueCap(taskQueueCap int) option

Optional parameters

Types

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

atomicBool is a wrapper around uint32 for usage as a boolean value with atomic access.

func NewAtomicBool

func NewAtomicBool(flag bool) *AtomicBool

func (*AtomicBool) IsSet

func (ab *AtomicBool) IsSet() bool

IsSet returns whether the current boolean value is true

func (*AtomicBool) IsTrue

func (ab *AtomicBool) IsTrue() bool

func (*AtomicBool) Set

func (ab *AtomicBool) Set(value bool)

Set sets the value of the bool regardless of the previous value

type Callable

type Callable interface {
	Call(ctx context.Context) *GPResult
}

type DynamicGPool

type DynamicGPool struct {

	// task queue
	TaskChan chan *FutureTask
	// contains filtered or unexported fields
}

nolint: govet

func (*DynamicGPool) Cancel

func (p *DynamicGPool) Cancel() bool

func (*DynamicGPool) CurrentGCount

func (p *DynamicGPool) CurrentGCount() int

func (*DynamicGPool) IsShutdown

func (p *DynamicGPool) IsShutdown() bool

func (*DynamicGPool) Shutdown

func (p *DynamicGPool) Shutdown()

New tasks may be added even after shutdown

func (*DynamicGPool) Submit

func (p *DynamicGPool) Submit(task Callable) (Future, error)

When submitting tasks, blocking may occur

func (*DynamicGPool) TaskQueueCap

func (p *DynamicGPool) TaskQueueCap() int

func (*DynamicGPool) TaskQueueLength

func (p *DynamicGPool) TaskQueueLength() int

func (*DynamicGPool) WaitTerminate

func (p *DynamicGPool) WaitTerminate()

type DynamicGPoolOption

type DynamicGPoolOption struct {
	// contains filtered or unexported fields
}

type ExecutorService

type ExecutorService interface {
	// no longer accept new tasks
	Shutdown()
	Submit(task Callable) (Future, error)
	IsShutdown() bool
	// Wait for all the tasks to be completed
	WaitTerminate()
	TaskQueueCap() int
	TaskQueueLength() int
	Cancel() bool
	CurrentGCount() int
}

func NewDynamicGPool

func NewDynamicGPool(ctx context.Context, min int, max int, opts ...dynamicOption) ExecutorService

func NewFixedGPool

func NewFixedGPool(ctx context.Context, size int, opts ...option) ExecutorService

func NewSingleGPool

func NewSingleGPool(ctx context.Context, opts ...option) ExecutorService

type FixedGPool

type FixedGPool struct {
	Size int
	// task queue
	TaskChan chan *FutureTask
	// contains filtered or unexported fields
}

nolint: govet

func (*FixedGPool) Cancel

func (p *FixedGPool) Cancel() bool

func (*FixedGPool) Consume

func (p *FixedGPool) Consume()

func (*FixedGPool) CurrentGCount

func (p *FixedGPool) CurrentGCount() int

func (*FixedGPool) IsShutdown

func (p *FixedGPool) IsShutdown() bool

func (*FixedGPool) Shutdown

func (p *FixedGPool) Shutdown()

New tasks may be added even after shutdown

func (*FixedGPool) Submit

func (p *FixedGPool) Submit(task Callable) (Future, error)

When submitting tasks, blocking may occur

func (*FixedGPool) TaskQueueCap

func (p *FixedGPool) TaskQueueCap() int

func (*FixedGPool) TaskQueueLength

func (p *FixedGPool) TaskQueueLength() int

func (*FixedGPool) WaitTerminate

func (p *FixedGPool) WaitTerminate()

type FixedGPoolOption

type FixedGPoolOption struct {
	// contains filtered or unexported fields
}

type Future

type Future interface {
	Get() *GPResult
	IsCancelled() bool
	Cancel() bool
	IsDone() bool
}

type FutureTask

type FutureTask struct {
	// contains filtered or unexported fields
}

func NewFutureTask

func NewFutureTask(ctx context.Context, c Callable) *FutureTask

func (*FutureTask) Cancel

func (f *FutureTask) Cancel() bool

func (*FutureTask) Get

func (f *FutureTask) Get() *GPResult

func (*FutureTask) IsCancelled

func (f *FutureTask) IsCancelled() bool

func (*FutureTask) IsDone

func (f *FutureTask) IsDone() bool

type GPResult

type GPResult struct {
	Value any
	Err   error
}

type Runnable

type Runnable interface {
	Run(ctx context.Context)
}

type ShrinkWorker

type ShrinkWorker struct {
	RunningFlag *AtomicBool
	ExitedFlag  chan struct{}
	ExitChan    chan struct{}
	// contains filtered or unexported fields
}

func NewShrinkWorker

func NewShrinkWorker(pool *DynamicGPool, interval time.Duration, meetCondNum int) *ShrinkWorker

func (*ShrinkWorker) Start

func (w *ShrinkWorker) Start()

Shrinking rules:

Condition: If the number of workers in a busy state is less than 1/4 of the total number of workers,
try to reduce the number of workers by 1/2. Execute meetCondNum consecutive checks,
with detectInterval every time, and perform shrinking if the conditions are met each time.

func (*ShrinkWorker) Stop

func (worker *ShrinkWorker) Stop()

type Worker

type Worker struct {
	RunningFlag *AtomicBool
	ExitedFlag  chan struct{}
	ExitChan    chan struct{}
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(pool *DynamicGPool) *Worker

func (*Worker) IsBusy

func (worker *Worker) IsBusy() bool

func (*Worker) Start

func (worker *Worker) Start()

func (*Worker) Stop

func (worker *Worker) Stop()

Directories

Path Synopsis
example
background command
dynamic command
dynamic2 command
fixed command
largenumber command
single command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL