在Go语言中,可以使用Redis的ZSet数据类型来实现延迟队列。ZSet是一个有序集合,每个元素都带有一个分数,在查询时按照分数从小到大排序。因此,我们可以将消息的触发时间作为分数存储在ZSet中,当消息到达触发时间时,再从ZSet中删除该消息并执行相应操作。
以下是一段示例代码:
package main
import (
"fmt"
"time"
"github.com/go-redis/redis"
)
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
defer client.Close()
msg := "hello world"
delayTime := 10 // 延迟10秒执行
err := client.ZAdd("delay_queue", &redis.Z{Score: float64(time.Now().Unix() + int64(delayTime)), Member: msg}).Err()
if err != nil {
panic(err)
}
for {
currentTime := time.Now().Unix()
msgs, err := client.ZRangeByScore("delay_queue", &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", currentTime),
Offset: 0,
Count: 1,
}).Result()
if len(msgs) == 0 || err != nil {
continue
}
for _, m := range msgs {
fmt.Println("execute message:", m)
// TODO 执行对应操作
// 删除已经处理过的消息
_, _ = client.ZRem("delay_queue", m).Result()
}
time.Sleep(time.Second)
}
}
在上述代码中,我们使用了go-redis库连接Redis,并通过ZAdd命令将消息添加到ZSet中。其中,分数为当前时间加上延迟时间。然后,通过ZRangeByScore命令获取所有分数小于等于当前时间的消息,并遍历执行相应操作。最后,删除已经处理过的消息。
需要注意的是,在实际生产环境中,可能需要考虑多个进程同时从队列中取出消息的情况。为了避免竞争条件和重复执行问题,可以使用Redis的WATCH、MULTI和EXEC命令来保证原子性操作。




