这篇主要分享一个Go协程池的简单实现...
package main
import (
"fmt"
"reflect"
"sync"
"time"
)
func test1(args ...interface{}) (string, int) {
fmt.Println("args:", args,"args len:", len(args))
return "hello", 1
}
// args ...interface{} 可变参数包
type Task struct {
Args []interface{}
FnHandler func(args ...interface{}) (string, int)
}
func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{}) *Task {
return &Task{
Args: A,
FnHandler: fn,
}
}
type workerPool struct {
taskQueue chan *Task
MaxWorkers uint
MinWorkers uint
activeCount uint
bufferSize uint
mtx sync.Mutex
}
var wp *workerPool
var oncePool sync.Once
func InitWorkerPool() {
if wp == nil {
wp = &workerPool{
taskQueue: make(chan *Task, 100),
MaxWorkers: 10,
MinWorkers: 2,
activeCount: 0,
bufferSize: 100,
}
for i := 0; i < int(wp.MinWorkers); i++ {
go func() {
for {
wp.DoTask()
}
}()
}
fmt.Println("init worker pool...")
}
}
func (wp *workerPool) AddTask(task *Task) {
wp.taskQueue <- task
}
func (wp *workerPool) DoTask() {
task := <-wp.taskQueue
wp.mtx.Lock()
wp.activeCount++
wp.mtx.Unlock()
task.FnHandler(task.Args...)
wp.mtx.Lock()
wp.activeCount--
wp.mtx.Unlock()
time.Sleep(300 * time.Millisecond)
}
func main() {
oncePool.Do(InitWorkerPool)
wp.AddTask(newTaskObj(test1, []interface{}{1, 2, 3, "hello", "world"}))
time.Sleep(3 * time.Second)
}
func test1(args ...interface{}) (string, int) {
fmt.Println("args:", args,"args len:", len(args))
return "hello", 1
}
args ...interface{} : 可变参数包,必须放在参数列表最后一个,和C/C++ 语法相同
返回值不支持参数包
type Task struct {
Args []interface{}
FnHandler func(args ...interface{}) (string, int)
}
func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{}) *Task {
return &Task{
Args: A,
FnHandler: fn,
}
}
Args []interface{} 存函数对象FuHandler 输入参数
FnHandler 函数对象,相当于C/C++ 中的函数指针,C++ 中function 对象
newTaskObj 构建一个作业任务对象,待用示例
func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{})
// 调用示例 func newTaskObj(function_name, []interface{arg1,arg2,arg3,...})
wp.AddTask(newTaskObj(test1, []interface{}{1, 2, 3, "hello", "world"}))
function_name :函数名
[]interface{arg1,arg2,arg3,...} : 传入 函数的参数
func (wp *workerPool) DoTask() {
task := <-wp.taskQueue
...
task.FnHandler(task.Args...)
...
time.Sleep(300 * time.Millisecond)
}
task.Args... 待传入FnHandler 指向的函数 入参 的参数包
task.FnHandler(task.Args...) 回调FnHandler 指向的函数