package main
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"fmt"
)
const (
BucketSize = 3 // 每个 bucket 的最大容量
IdSize = 20 // ID 的大小,字节数
PeerSize = 100 // 每个节点中的 Peer 数量
BucketNum = 160 // 每个 DHT 中桶的数量
KeySize = 20 // 键的大小,字节数
ValueSize = 1024 // 值的大小,字节数
)
type Node struct {
id [IdSize]byte // 节点的 ID,长度为 IdSize
data string // 节点存储的数据
dht *DHT // DHT 结构体指针
active bool // 是否激活状态(已加入网络)
}
type Bucket struct {
nodes []Node // 节点列表
}
type KBucket struct {
buckets [BucketNum]*Bucket // K-Bucket 中存放 bucket 的数组
selfId [IdSize]byte // 自身节点的 ID
maxNodes int // 每个 bucket 的最大节点数量
dhts []*DHT // 所有节点中保存了自己这个节点信息的 DHT 结构体指针数组
k int // 最近邻居数目(算法参数)
alpha int // RPC 并发度(算法参数)
}
// 执行 SHA1 Hash 算法并返回结果。 func getHash(s string) []byte {
h := sha1.New()
h.Write([]byte(s))
return h.Sum(nil)
}
func NewBucket() *Bucket {
return &Bucket{
nodes: make([]Node, 0, BucketSize), // 初始化节点列表,容量为 BUCKET_SIZE
}
}
func (b *Bucket) Len() int {
return len(b.nodes) // 返回节点列表的长度
}
// 将新节点插入到当前 bucket 中,并返回是否添加成功。 func (b *Bucket) insertNode(n Node) bool {
if len(b.nodes) >= BucketSize { // 超过容量,无法添加节点
return false
}
for i, x := range b.nodes { // 节点已存在,则更新数据
if bytes.Equal(x.id[:], n.id[:]) {
b.nodes[i].data = n.data
b.nodes[i].active = true
return true
}
}
b.nodes = append(b.nodes, n) // 添加新节点
n.active = true // 新加入的节点设置为激活状态(已加入网络)
return true
}
// 更新指定 ID 的节点的数据。 func (b *Bucket) UpdateNode(n Node) {
for i, x := range b.nodes { // 更新节点数据
if bytes.Equal(x.id[:], n.id[:]) {
b.nodes[i].data = n.data
b.nodes[i].active = true
}
}
}
// 删除指定 ID 的节点。 func (b *Bucket) RemoveNode(id [IdSize]byte) bool {
for i, x := range b.nodes { // 删除节点
if bytes.Equal(x.id[:], id[:]) {
copy(b.nodes[i:], b.nodes[i+1:])
b.nodes[len(b.nodes)-1] = Node{}
b.nodes = b.nodes[:len(b.nodes)-1]
return true
}
}
return false // 节点不存在,无法删除
}
// 查找指定 ID 的节点。 func (b *Bucket) FindNode(id [IdSize]byte) (Node, bool) {
for _, x := range b.nodes {
if bytes.Equal(x.id[:], id[:]) { // 查找节点
return x, true
}
}
return Node{}, false // 节点不存在
}
type Peer struct {
kBuckets *KBucket // 所属的 K-Bucket
selfID [IdSize]byte // 自身节点的 ID
dht []*DHT // DHT 结构体列表,每个元素表示一个节点对应的 DHT 结构体指针。
buckets [BucketNum][]Node // 桶中存储的节点信息(Peer 对象)
lastIndex int // 上一次返回值的索引号(用于迭代查询)
}
// 创建一个 Peer 对象。 func NewPeer(kb *KBucket, id [IdSize]byte) *Peer {
p := &Peer{
kBuckets: kb,
selfID: id,
dht: make([]*DHT, 0, PeerSize),
}
for i := 0; i < BucketNum; i++ { // 初始化桶中存储的节点信息
p.buckets[i] = make([]Node, 0)
}
return p
}
// 计算目标 ID 和当前节点 ID 的距离。 func (p *Peer) getDistance(id [IdSize]byte) []byte {
var distance [KeySize]byte
for i := 0; i < KeySize; i++ {
distance[i] = p.selfID[i] ^ id[i]
}
return distance[:]
}
// 获取指定 ID 应该放置的桶的索引值。 func (p *Peer) calcBucketIndex(id [IdSize]byte) int {
distance := p.getDistance(id)
zeros := 0
for i := 0; i < KeySize; i++ {
for j := 7; j >= 0; j-- {
if (distance[i]>>uint(j))&0x01 != 0 {
return zeros // 返回 ID 中前导零的个数
}
zeros++
}
}
return zeros
}
// 将新节点插入到当前 Peer 的对应桶中,并返回是否添加成功。 func (p *Peer) insertNode(n Node) bool {
if bytes.Equal(n.id[:], p.selfID[:]) { // 自身节点不需要添加
return true
}
pos := p.calcBucketIndex(n.id) // 计算节点应该放置的桶的索引值
bucket := &p.buckets[pos] // 获取对应的桶
if len(*bucket) < BucketSize { // 直接添加节点到 bucket 中
for _, x := range *bucket { // 节点已存在,则更新数据并设置为激活状态(已加入网络)
if bytes.Equal(x.id[:], n.id[:]) {
x.data = n.data
x.dht = n.dht
x.active = true
return true
}
}
n.active = true // 新加入的节点设置为激活状态(已加入网络)
n.dht.insert(p) // 将当前节点添加到新节点的 DHT 中
*bucket = append(*bucket, n) // 添加新节点
return true
}
if pos == BucketNum-1 { // 节点与自身节点相同,无法添加
return false
}
var nodes []Node // 存储所有距离目标 ID 更近的节点(包括当前 bucket 中的和其它 bucket 中的)
for i := 0; i < BucketNum; i++ {
if i != pos { // 不需要遍历当前 bucket,因为已经在上面进行过判断了。
nodes = append(nodes, p.buckets[i]...)
}
}
if len(nodes) == 0 { // 如果没有更近的节点,则无法添加新节点。
return false
}
closestNodes := getClosestNodes(p.getDistance(n.id), nodes, p.kBuckets.k) // 获取最近的 k 个节点
for _, node := range closestNodes {
node.dht[0].SetValue(n.id[:], []byte(n.data)) // 将数据存储到最近邻居对应的桶中
}
n.active = true // 新加入的节点设置为激活状态(已加入网络)
n.dht.insert(p) // 将当前节点添加到新节点的 DHT 中
bucket = &p.buckets[pos+1] // 获取下一个桶(由于上面已经判断过是否是最后一个桶,所以这里直接获取下一个桶即可)
if len(*bucket) >= BucketSize { // 如果下一个桶已满,则需要新建一个桶
newBucket := make([]Node, 0)
p.buckets[pos+1] = newBucket // 将新桶添加到 Peer 中
bucket = &newBucket
}
*bucket = append(*bucket, n) // 将节点添加到下一个桶中
return true
}
// 删除指定 ID 的节点。 func (p *Peer) RemoveNode(id [IdSize]byte) bool {
pos := p.calcBucketIndex(id)
bucket := &p.buckets[pos]
for i, x := range *bucket { // 删除节点
if bytes.Equal(x.id[:], id[:]) {
x.active = false // 设置为非激活状态(已离开网络)
copy((*bucket)[i:], (*bucket)[i+1:])
(*bucket)[len(*bucket)-1] = Node{}
*bucket = (*bucket)[:len(*bucket)-1]
return true
}
}
return false
}
// 查找




