这篇主要分享一个Go协程池的简单实现...

package main

import (
    "fmt"
    "reflect"
    "sync"
    "time"
)

func test1(args ...interface{}) (string, int) {
    fmt.Println("args:", args,"args len:", len(args))
    return "hello", 1
}
// args ...interface{} 可变参数包

type Task struct {
    Args      []interface{}
    FnHandler func(args ...interface{}) (string, int)
}

func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{}) *Task {
    return &Task{
        Args:      A,
        FnHandler: fn,
    }
}

type workerPool struct {
    taskQueue   chan *Task
    MaxWorkers  uint
    MinWorkers  uint
    activeCount uint
    bufferSize  uint
    mtx         sync.Mutex
}

var wp *workerPool
var oncePool sync.Once

func InitWorkerPool() {
    if wp == nil {
        wp = &workerPool{
            taskQueue:   make(chan *Task, 100),
            MaxWorkers:  10,
            MinWorkers:  2,
            activeCount: 0,
            bufferSize:  100,
        }
        for i := 0; i < int(wp.MinWorkers); i++ {
            go func() {
                for {
                    wp.DoTask()
                }
            }()
        }

        fmt.Println("init worker pool...")
    }
}

func (wp *workerPool) AddTask(task *Task) {
    wp.taskQueue <- task
}

func (wp *workerPool) DoTask() {
    task := <-wp.taskQueue
    wp.mtx.Lock()
    wp.activeCount++
    wp.mtx.Unlock()
    task.FnHandler(task.Args...)
    wp.mtx.Lock()
    wp.activeCount--
    wp.mtx.Unlock()
    time.Sleep(300 * time.Millisecond)
}

func main() {
    oncePool.Do(InitWorkerPool)
    wp.AddTask(newTaskObj(test1, []interface{}{1, 2, 3, "hello", "world"}))
    time.Sleep(3 * time.Second)
}

func test1(args ...interface{}) (string, int) {
    fmt.Println("args:", args,"args len:", len(args))
    return "hello", 1
}
  • args ...interface{} : 可变参数包,必须放在参数列表最后一个,和C/C++ 语法相同

  • 返回值不支持参数包


type Task struct {
    Args      []interface{}
    FnHandler func(args ...interface{}) (string, int)
}

func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{}) *Task {
    return &Task{
        Args:      A,
        FnHandler: fn,
    }
}
  • Args []interface{} 存函数对象FuHandler 输入参数

  • FnHandler 函数对象,相当于C/C++ 中的函数指针,C++ 中function 对象

  • newTaskObj 构建一个作业任务对象,待用示例


func newTaskObj(fn func(args ...interface{}) (string, int), A []interface{})
// 调用示例 func newTaskObj(function_name, []interface{arg1,arg2,arg3,...})
wp.AddTask(newTaskObj(test1, []interface{}{1, 2, 3, "hello", "world"}))

  • function_name :函数名

  • []interface{arg1,arg2,arg3,...} : 传入 函数的参数

func (wp *workerPool) DoTask() {
    task := <-wp.taskQueue
    ...
    task.FnHandler(task.Args...)
    ...
    time.Sleep(300 * time.Millisecond)
}
  • task.Args... 待传入FnHandler 指向的函数 入参 的参数包

  • task.FnHandler(task.Args...) 回调FnHandler 指向的函数