ZBLOG

go etcd 分布式锁

etcd是一个分布式的键值存储系统,支持watch、ttl等特性,同时提供了分布式锁的实现。使用etcd分布式锁可以实现多个节点之间的协同工作,避免因为并发操作而导致的数据不一致问题。

实现分布式锁的基本思路如下:

  1. 在etcd中创建一个临时有序节点,代表当前请求正在竞争锁。

  2. 判断自己是否得到锁(是否是最小的节点)。

  3. 如果没有得到锁,则监听前面的节点,等待其释放锁。

  4. 如果得到锁,则开始执行任务,在任务完成后删除自己创建的节点,释放锁。

具体代码实现如下:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go.etcd.io/etcd/clientv3"
  6. "time"
  7. )
  8. type DistributedLock struct {
  9. cli *clientv3.Client
  10. name string
  11. }
  12. func NewDistributedLock(endpoints []string, name string) (*DistributedLock, error) {
  13. cli, err := clientv3.New(clientv3.Config{
  14. Endpoints: endpoints,
  15. DialTimeout: 5 * time.Second,
  16. })
  17. if err != nil {
  18. return nil, err
  19. }
  20. return &DistributedLock{
  21. cli: cli,
  22. name: name,
  23. }, nil
  24. }
  25. func (dl *DistributedLock) Lock() error {
  26. resp, err := dl.cli.Grant(context.Background(), 5)
  27. if err != nil {
  28. return err
  29. }
  30. leaseID := resp.ID
  31. key := fmt.Sprintf("/locks/%s", dl.name)
  32. _, err = dl.cli.Txn(context.Background()).
  33. If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
  34. Then(clientv3.OpPut(key, "", clientv3.WithLease(leaseID))).
  35. Commit()
  36. if err != nil {
  37. return err
  38. }
  39. ctx, cancel := context.WithCancel(context.Background())
  40. defer cancel()
  41. ch := dl.cli.Watch(ctx, key)
  42. for {
  43. select {
  44. case <-ch:
  45. resp, _ := dl.cli.Get(context.Background(), key)
  46. if len(resp.Kvs) == 0 {
  47. return nil
  48. }
  49. minKey := resp.Kvs[0]
  50. for _, kv := range resp.Kvs {
  51. if kv.Lease < minKey.Lease {
  52. minKey = kv
  53. }
  54. }
  55. if string(minKey.Key) == key {
  56. return nil
  57. }
  58. case <-time.After(2 * time.Second):
  59. return fmt.Errorf("Lock timeout")
  60. }
  61. }
  62. }
  63. func (dl *DistributedLock) Unlock() error {
  64. key := fmt.Sprintf("/locks/%s", dl.name)
  65. _, err := dl.cli.Delete(context.Background(), key)
  66. if err != nil {
  67. return err
  68. }
  69. return nil
  70. }
  71. func main() {
  72. dl1, err := NewDistributedLock([]string{"localhost:2379"}, "test")
  73. if err != nil {
  74. panic(err)
  75. }
  76. err = dl1.Lock()
  77. if err != nil {
  78. panic(err)
  79. }
  80. fmt.Println("Got lock 1")
  81. dl2, err := NewDistributedLock([]string{"localhost:2379"}, "test")
  82. if err != nil {
  83. panic(err)
  84. }
  85. err = dl2.Lock()
  86. if err == nil {
  87. panic("Should not get the lock")
  88. }
  89. fmt.Println("Can't get lock 2")
  90. err = dl1.Unlock()
  91. if err != nil {
  92. panic(err)
  93. }
  94. fmt.Println("Released lock 1")
  95. err = dl2.Lock()
  96. if err != nil {
  97. panic(err)
  98. }
  99. fmt.Println("Got lock 2")
  100. err = dl2.Unlock()
  101. if err != nil {
  102. panic(err)
  103. }
  104. fmt.Println("Released lock 2")
  105. }

以上代码实现了一个简单的分布式锁。首先创建了两个DistributedLock对象,它们都使用名为”test”的锁。然后dl1成功获取了锁,并在控制台输出”Got lock 1”,接着dl2尝试获取锁并失败,最后dl1释放锁,并dl2再次成功获取锁。

这里的实现只是一个简单的示例,还有很多需要完善的地方。例如,如果获取锁超时,应该返回错误;如果某个节点意外崩溃,应该能够自动释放其持有的锁等等。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=167

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?