ZBLOG

golang 语言 实现带有context 超时机制的 协程池

以下是一个简单的带有context超时机制的协程池实现:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Pool struct {
	wg sync.WaitGroup
	ch chan int
}

func NewPool(size int) *Pool {
	return &Pool{
		ch: make(chan int, size),
	}
}

func (p *Pool) AddTask(ctx context.Context, f func()) error {
	select {
	case p.ch <- 1:
	default:
		return fmt.Errorf("pool is full")
	}

	p.wg.Add(1)
	go func() {
		defer func() {
			p.wg.Done()
			<-p.ch
		}()

		f()
	}()

	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		return nil
	}
}

func main() {
	pool := NewPool(10)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	for i := 0; i < 20; i++ {
		err := pool.AddTask(ctx, func() {
			fmt.Println("task start")
			time.Sleep(time.Second)
			fmt.Println("task end")
		})

        // 这里可以根据实际需求自定义错误处理逻辑
        if err != nil {
            fmt.Printf("add task error: %v\n", err)
        }
    }

	pool.wg.Wait()
}

在上述代码中,我们首先定义了一个Pool结构体来管理协程池。其中包含了一个缓冲通道ch,它用于限制同时运行的协程数量。当需要执行任务时,我们首先尝试向通道中写入一个值,如果通道已满,则说明当前协程池已经达到了最大并发数,无法再继续添加新的任务。当有协程完成任务时,它会从通道中取出一个值来释放资源。

AddTask方法中,我们首先启动一个新的协程来执行指定的函数,并将其加入等待组中。然后,在函数执行完毕之后,我们从通道中取出一个值来释放资源。同时,在函数执行期间,我们使用select语句监听context.Done()通道的状态。如果超时或者被取消,则返回相应的错误信息。否则返回nil。

在main函数中,我们首先创建了一个带有5秒超时时间的context,并往协程池中添加了20个任务。其中只有10个能够被同时执行,其余的需要等待其他任务结束后才能开始执行。如果某个任务超时或者被取消,则会在AddTask方法中返回相应的错误信息。

需要注意的是,在调用WaitGroup.Wait()方法之前必须保证所有任务都已经完成,否则程序会一直阻塞在这里。因此,在调用AddTask方法之后不能立即退出程序,而是要等待所有任务完成之后再退出。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=4787

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?