Documentation
¶
Index ¶
- func WithCoreGo(n int32) option.Option[OnDemandBlockTaskPool]
- func WithMaxGo(n int32) option.Option[OnDemandBlockTaskPool]
- func WithMaxIdleTime(d time.Duration) option.Option[OnDemandBlockTaskPool]
- func WithQueueBacklogRate(rate float64) option.Option[OnDemandBlockTaskPool]
- type OnDemandBlockTaskPool
- func (b *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)
- func (b *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)
- func (b *OnDemandBlockTaskPool) Start() error
- func (b *OnDemandBlockTaskPool) States(ctx context.Context, interval time.Duration) (<-chan State, error)
- func (b *OnDemandBlockTaskPool) Submit(ctx context.Context, task Task) error
- type State
- type Task
- type TaskFunc
- type TaskPool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithCoreGo ¶
func WithCoreGo(n int32) option.Option[OnDemandBlockTaskPool]
func WithMaxIdleTime ¶
func WithMaxIdleTime(d time.Duration) option.Option[OnDemandBlockTaskPool]
func WithQueueBacklogRate ¶
func WithQueueBacklogRate(rate float64) option.Option[OnDemandBlockTaskPool]
Types ¶
type OnDemandBlockTaskPool ¶
type OnDemandBlockTaskPool struct {
// contains filtered or unexported fields
}
OnDemandBlockTaskPool 按需创建goroutine的并发阻塞的任务池
func NewOnDemandBlockTaskPool ¶
func NewOnDemandBlockTaskPool(initGo int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error)
NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool initGo 是初始协程数 queueSize 是队列大小,即最多有多少个任务在等待调度 使用相应的Option选项可以动态扩展协程数
Example ¶
p, _ := NewOnDemandBlockTaskPool(10, 100)
_ = p.Start()
// wg 只是用来确保任务执行的,你在实际使用过程中是不需要的
var wg sync.WaitGroup
wg.Add(1)
_ = p.Submit(context.Background(), TaskFunc(func(ctx context.Context) error {
fmt.Println("hello, world")
wg.Done()
return nil
}))
wg.Wait()
Output: hello, world
func (*OnDemandBlockTaskPool) Shutdown ¶
func (b *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)
Shutdown 将会拒绝提交新的任务,但是会继续执行已提交任务 当执行完毕后,会往返回的 chan 中丢入信号 Shutdown 会负责关闭返回的 chan Shutdown 无法中断正在执行的任务
func (*OnDemandBlockTaskPool) ShutdownNow ¶
func (b *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)
ShutdownNow 立刻关闭任务池,并且返回所有剩余未执行的任务(不包含正在执行的任务)
func (*OnDemandBlockTaskPool) Start ¶
func (b *OnDemandBlockTaskPool) Start() error
Start 开始调度任务执行 Start 之后,调用者可以继续使用 Submit 提交任务
func (*OnDemandBlockTaskPool) States ¶
func (b *OnDemandBlockTaskPool) States(ctx context.Context, interval time.Duration) (<-chan State, error)
Example ¶
p, _ := NewOnDemandBlockTaskPool(10, 100)
var wg sync.WaitGroup
wg.Add(1)
_ = p.Submit(context.Background(), TaskFunc(func(ctx context.Context) error {
wg.Done()
return nil
}))
_ = p.Start()
ch, err := p.States(context.Background(), time.Second*10)
if err == nil {
fmt.Println("get ch")
}
state := <-ch
fmt.Println(state.PoolState)
fmt.Println(state.RunningTasksCnt)
fmt.Println(state.WaitingTasksCnt)
fmt.Println(state.GoCnt)
fmt.Println(state.QueueSize)
wg.Wait()
Output: get ch 2 0 0 10 100
type Task ¶
type Task interface {
// Run 执行任务
// 如果 ctx 设置了超时时间,那么实现者需要自己决定是否进行超时控制
Run(ctx context.Context) error
}
Task 代表一个任务
type TaskPool ¶
type TaskPool interface {
// Submit 执行一个任务
// 如果任务池提供了阻塞的功能,那么如果在 ctx 过期都没有提交成功,那么应该返回错误
// 调用 Start 之后能否继续提交任务,则取决于具体的实现
// 调用 Shutdown 或者 ShutdownNow 之后提交任务都会返回错误
Submit(ctx context.Context, task Task) error
// Start 开始调度任务执行。在调用 Start 之前,所有的任务都不会被调度执行。
// Start 之后,能否继续调用 Submit 提交任务,取决于具体的实现
Start() error
// Shutdown 关闭任务池。如果此时尚未调用 Start 方法,那么将会立刻返回。
// 任务池将会停止接收新的任务,但是会继续执行剩下的任务,
// 在所有任务执行完毕之后,用户可以从返回的 chan 中得到通知
// 任务池在发出通知之后会关闭 chan struct{}
Shutdown() (<-chan struct{}, error)
// ShutdownNow 立刻关闭线程池
// 任务池能否中断当前正在执行的任务,取决于 TaskPool 的具体实现,以及 Task 的具体实现
// 该方法会返回所有剩下的任务,剩下的任务是否包含正在执行的任务,也取决于具体的实现
ShutdownNow() ([]Task, error)
// States 暴露 TaskPool 生命周期内的运行状态
// ctx 是让用户来控制什么时候退出采样。那么最基本的两个退出机制:一个是 ctx 被 cancel 了或者超时了,一个是TaskPool 被关闭了
// error 仅仅表示创建 chan state 是否成功
// interval 表示获取TaskPool运行期间内部状态的周期/时间间隔
States(ctx context.Context, interval time.Duration) (<-chan State, error)
}
TaskPool 任务池
Click to show internal directories.
Click to hide internal directories.