1 Star 0 Fork 7

kyle / gobatch

forked from chararch / gobatch 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
taskpool.go 1.20 KB
一键复制 编辑 原始数据 按行查看 历史
chararch 提交于 2022-03-24 00:21 . format code & add comment
package gobatch
import (
"context"
"fmt"
"github.com/panjf2000/ants/v2"
)
type taskPool struct {
pool *ants.Pool
}
func newTaskPool(size int) *taskPool {
pool, _ := ants.NewPool(size)
return &taskPool{
pool: pool,
}
}
// Future get result in future
type Future interface {
Get() (interface{}, error)
}
type futureImpl struct {
ch <-chan interface{}
}
func (f *futureImpl) Get() (interface{}, error) {
result := <-f.ch
err := <-f.ch
if err == nil {
return result, nil
}
e, ok := err.(error)
if ok {
return result, e
}
return result, fmt.Errorf("future get err:%v", err)
}
func (pool *taskPool) Submit(ctx context.Context, task func() (interface{}, error)) Future {
result := make(chan interface{}, 2)
err := pool.pool.Submit(func() {
defer func() {
if err := recover(); err != nil {
//todo log
result <- nil
result <- fmt.Errorf("panic:%v", err)
close(result)
}
}()
val, err := task()
result <- val
result <- err
close(result)
})
if err != nil {
result <- nil
result <- err
close(result)
}
return &futureImpl{
ch: result,
}
}
func (pool *taskPool) Release() {
pool.pool.Release()
}
func (pool *taskPool) SetMaxSize(size int) {
pool.pool.Tune(size)
}
Go
1
https://gitee.com/workface/gobatch.git
git@gitee.com:workface/gobatch.git
workface
gobatch
gobatch
master

搜索帮助