以下是一个简单的带有context超时机制的协程池实现:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Pool struct {
wg sync.WaitGroup
ch chan int
}
func NewPool(size int) *Pool {
return &Pool{
ch: make(chan int, size),
}
}
func (p *Pool) AddTask(ctx context.Context, f func()) error {
select {
case p.ch <- 1:
default:
return fmt.Errorf("pool is full")
}
p.wg.Add(1)
go func() {
defer func() {
p.wg.Done()
<-p.ch
}()
f()
}()
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func main() {
pool := NewPool(10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for i := 0; i < 20; i++ {
err := pool.AddTask(ctx, func() {
fmt.Println("task start")
time.Sleep(time.Second)
fmt.Println("task end")
})
// 这里可以根据实际需求自定义错误处理逻辑
if err != nil {
fmt.Printf("add task error: %v\n", err)
}
}
pool.wg.Wait()
}
在上述代码中,我们首先定义了一个Pool结构体来管理协程池。其中包含了一个缓冲通道ch,它用于限制同时运行的协程数量。当需要执行任务时,我们首先尝试向通道中写入一个值,如果通道已满,则说明当前协程池已经达到了最大并发数,无法再继续添加新的任务。当有协程完成任务时,它会从通道中取出一个值来释放资源。
在AddTask方法中,我们首先启动一个新的协程来执行指定的函数,并将其加入等待组中。然后,在函数执行完毕之后,我们从通道中取出一个值来释放资源。同时,在函数执行期间,我们使用select语句监听context.Done()通道的状态。如果超时或者被取消,则返回相应的错误信息。否则返回nil。
在main函数中,我们首先创建了一个带有5秒超时时间的context,并往协程池中添加了20个任务。其中只有10个能够被同时执行,其余的需要等待其他任务结束后才能开始执行。如果某个任务超时或者被取消,则会在AddTask方法中返回相应的错误信息。
需要注意的是,在调用WaitGroup.Wait()方法之前必须保证所有任务都已经完成,否则程序会一直阻塞在这里。因此,在调用AddTask方法之后不能立即退出程序,而是要等待所有任务完成之后再退出。




