使用环形缓冲区可以减少动态内存分配和释放的次数,从而提高效率。以下是使用环形缓冲区优化后的LockFreeQueue实现:
#include
template
LockFreeQueue(std::size_t capacity) : buffer_(new Node[capacity]), capacity_(capacity)
{
for (std::size_t i = 0; i < capacity_; ++i)
{
buffer_[i].sequence.store(i, std::memory_order_relaxed);
}
head_.store(0, std::memory_order_relaxed);
tail_.store(0, std::memory_order_relaxed);
}
~LockFreeQueue()
{
delete[] buffer_;
}
void push(const T &item)
{
std::unique_ptr<Node> new_node(new Node(item));
std::uint64_t pos = 0;
while (true)
{
std::uint64_t tail = tail_.load(std::memory_order_relaxed);
Node *node = &buffer_[tail % capacity_];
std::uint64_t seq = node->sequence.load(std::memory_order_acquire);
std::int64_t dif = static_cast<std::int64_t>(seq - tail);
if (dif == 0)
{
if (tail_.compare_exchange_weak(tail, tail + 1, std::memory_order_relaxed))
{
node->data = item;
node->sequence.store(tail + 1, std::memory_order_release);
return;
}
}
else if (dif < 0)
{
continue;
}
else
{
tail = tail_.load(std::memory_order_relaxed);
pos = (tail - tail % capacity_) / capacity_ * capacity_ + seq % capacity_;
if (pos >= tail && pos < tail + capacity_)
{
continue;
}
else
{
continue;
}
}
}
}
bool try_pop(T &item)
{
std::uint64_t pos = 0;
while (true)
{
std::uint64_t head = head_.load(std::memory_order_relaxed);
Node *node = &buffer_[head % capacity_];
std::uint64_t seq = node->sequence.load(std::memory_order_acquire);
std::int64_t dif = static_cast<std::int64_t>(seq - (head + 1));
if (dif == 0)
{
if (head_.compare_exchange_weak(head, head + 1, std::memory_order_relaxed))
{
item = node->data;
node->sequence.store(head + capacity_, std::memory_order_release);
return true;
}
}
else if (dif < 0)
{
continue;
}
else
{
head = head_.load(std::memory_order_relaxed);
pos = (head - head % capacity_) / capacity_ * capacity_ + (head + 1) % capacity_;
if (pos >= head && pos < head + capacity_)
{
continue;
}
else
{
return false;
}
}
}
}
private:
struct Node
{
T data;
std::atomic<std::uint64_t> sequence;
Node() {}
Node(const T &item) : data(item) {}
Node(const Node &) = delete;
Node &operator=(const Node &) = delete;
};
Node *buffer_;
std::size_t capacity_;
std::atomic<std::uint64_t> head_;
std::atomic<std::uint64_t> tail_;
};
在环形缓冲区中,使用序列号标识每个节点的位置。当生产者将一个节点放入队列时,先获取当前尾节点的序列号tail和该位置上的节点node的序列号seq。如果dif等于0,则表示此时该位置空闲,可以放入新节点,并更新尾节点和节点的序列号;否则,需要判断是否有其他线程在该位置放置过节点,若是则继续循环,否则需要在环中找到比当前尾节点位置大的第一个空闲位置,并放入新节点。
当消费者从队列中弹出一个节点时,同样需要获取当前头节点的序列号head和该位置上的节点node的序列号seq。如果dif等于0,则表示该节点为当前头节点,可以弹出节点并更新头节点和节点的序列号;否则,需要判断是否有其他线程在该位置放置过节点,若是则继续循环,否则说明该位置已经被其他线程弹出,返回false。