ZBLOG

#include #include template class LockFreeQueue { public: LockFreeQueue() : head_(new Node), tail_(head_.load()) {} void push(const T &item) { std::unique_ptr new_node(new Node(item)); ...

使用环形缓冲区可以减少动态内存分配和释放的次数,从而提高效率。以下是使用环形缓冲区优化后的LockFreeQueue实现:

#include #include

template class LockFreeQueue { public:

LockFreeQueue(std::size_t capacity) : buffer_(new Node[capacity]), capacity_(capacity)
{
    for (std::size_t i = 0; i < capacity_; ++i)
    {
        buffer_[i].sequence.store(i, std::memory_order_relaxed);
    }
    head_.store(0, std::memory_order_relaxed);
    tail_.store(0, std::memory_order_relaxed);
}

~LockFreeQueue()
{
    delete[] buffer_;
}

void push(const T &item)
{
    std::unique_ptr<Node> new_node(new Node(item));
    std::uint64_t pos = 0;
    while (true)
    {
        std::uint64_t tail = tail_.load(std::memory_order_relaxed);
        Node *node = &buffer_[tail % capacity_];
        std::uint64_t seq = node->sequence.load(std::memory_order_acquire);
        std::int64_t dif = static_cast<std::int64_t>(seq - tail);
        if (dif == 0)
        {
            if (tail_.compare_exchange_weak(tail, tail + 1, std::memory_order_relaxed))
            {
                node->data = item;
                node->sequence.store(tail + 1, std::memory_order_release);
                return;
            }
        }
        else if (dif < 0)
        {
            continue;
        }
        else
        {
            tail = tail_.load(std::memory_order_relaxed);
            pos = (tail - tail % capacity_) / capacity_ * capacity_ + seq % capacity_;
            if (pos >= tail && pos < tail + capacity_)
            {
                continue;
            }
            else
            {
                continue;
            }
        }
    }
}

bool try_pop(T &item)
{
    std::uint64_t pos = 0;
    while (true)
    {
        std::uint64_t head = head_.load(std::memory_order_relaxed);
        Node *node = &buffer_[head % capacity_];
        std::uint64_t seq = node->sequence.load(std::memory_order_acquire);
        std::int64_t dif = static_cast<std::int64_t>(seq - (head + 1));
        if (dif == 0)
        {
            if (head_.compare_exchange_weak(head, head + 1, std::memory_order_relaxed))
            {
                item = node->data;
                node->sequence.store(head + capacity_, std::memory_order_release);
                return true;
            }
        }
        else if (dif < 0)
        {
            continue;
        }
        else
        {
            head = head_.load(std::memory_order_relaxed);
            pos = (head - head % capacity_) / capacity_ * capacity_ + (head + 1) % capacity_;
            if (pos >= head && pos < head + capacity_)
            {
                continue;
            }
            else
            {
                return false;
            }
        }
    }
}

private:

struct Node
{
    T data;
    std::atomic<std::uint64_t> sequence;
    Node() {}
    Node(const T &item) : data(item) {}
    Node(const Node &) = delete;
    Node &operator=(const Node &) = delete;
};

Node *buffer_;
std::size_t capacity_;
std::atomic<std::uint64_t> head_;
std::atomic<std::uint64_t> tail_;

};

在环形缓冲区中,使用序列号标识每个节点的位置。当生产者将一个节点放入队列时,先获取当前尾节点的序列号tail和该位置上的节点node的序列号seq。如果dif等于0,则表示此时该位置空闲,可以放入新节点,并更新尾节点和节点的序列号;否则,需要判断是否有其他线程在该位置放置过节点,若是则继续循环,否则需要在环中找到比当前尾节点位置大的第一个空闲位置,并放入新节点。

当消费者从队列中弹出一个节点时,同样需要获取当前头节点的序列号head和该位置上的节点node的序列号seq。如果dif等于0,则表示该节点为当前头节点,可以弹出节点并更新头节点和节点的序列号;否则,需要判断是否有其他线程在该位置放置过节点,若是则继续循环,否则说明该位置已经被其他线程弹出,返回false。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=1404

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?