ZBLOG

go etcd 分布式锁

etcd是一个分布式的键值存储系统,支持watch、ttl等特性,同时提供了分布式锁的实现。使用etcd分布式锁可以实现多个节点之间的协同工作,避免因为并发操作而导致的数据不一致问题。

实现分布式锁的基本思路如下:

  1. 在etcd中创建一个临时有序节点,代表当前请求正在竞争锁。

  2. 判断自己是否得到锁(是否是最小的节点)。

  3. 如果没有得到锁,则监听前面的节点,等待其释放锁。

  4. 如果得到锁,则开始执行任务,在任务完成后删除自己创建的节点,释放锁。

具体代码实现如下:

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

type DistributedLock struct {
	cli  *clientv3.Client
	name string
}

func NewDistributedLock(endpoints []string, name string) (*DistributedLock, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, err
	}
	return &DistributedLock{
		cli:  cli,
		name: name,
	}, nil
}

func (dl *DistributedLock) Lock() error {
	resp, err := dl.cli.Grant(context.Background(), 5)
	if err != nil {
		return err
	}

	leaseID := resp.ID
	key := fmt.Sprintf("/locks/%s", dl.name)

	_, err = dl.cli.Txn(context.Background()).
		If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
		Then(clientv3.OpPut(key, "", clientv3.WithLease(leaseID))).
		Commit()
	if err != nil {
		return err
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch := dl.cli.Watch(ctx, key)

	for {
		select {
		case <-ch:
			resp, _ := dl.cli.Get(context.Background(), key)
			if len(resp.Kvs) == 0 {
				return nil
			}
			minKey := resp.Kvs[0]
			for _, kv := range resp.Kvs {
				if kv.Lease < minKey.Lease {
					minKey = kv
				}
			}
			if string(minKey.Key) == key {
				return nil
			}
		case <-time.After(2 * time.Second):
			return fmt.Errorf("Lock timeout")
		}
	}
}

func (dl *DistributedLock) Unlock() error {
	key := fmt.Sprintf("/locks/%s", dl.name)
	_, err := dl.cli.Delete(context.Background(), key)
	if err != nil {
		return err
	}
	return nil
}

func main() {
	dl1, err := NewDistributedLock([]string{"localhost:2379"}, "test")
	if err != nil {
		panic(err)
	}
	err = dl1.Lock()
	if err != nil {
		panic(err)
	}
	fmt.Println("Got lock 1")

	dl2, err := NewDistributedLock([]string{"localhost:2379"}, "test")
	if err != nil {
		panic(err)
	}
	err = dl2.Lock()
	if err == nil {
		panic("Should not get the lock")
	}
	fmt.Println("Can't get lock 2")

	err = dl1.Unlock()
	if err != nil {
		panic(err)
	}
	fmt.Println("Released lock 1")

	err = dl2.Lock()
	if err != nil {
		panic(err)
	}
	fmt.Println("Got lock 2")

	err = dl2.Unlock()
	if err != nil {
		panic(err)
	}
	fmt.Println("Released lock 2")
}

以上代码实现了一个简单的分布式锁。首先创建了两个DistributedLock对象,它们都使用名为”test”的锁。然后dl1成功获取了锁,并在控制台输出”Got lock 1”,接着dl2尝试获取锁并失败,最后dl1释放锁,并dl2再次成功获取锁。

这里的实现只是一个简单的示例,还有很多需要完善的地方。例如,如果获取锁超时,应该返回错误;如果某个节点意外崩溃,应该能够自动释放其持有的锁等等。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=167

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?