这个队列的实现有几个问题:
在try_pop中,使用memory_order_release在调用完memorypool.Free之后会导致内存泄漏。应该使用memory_order_acq_rel或者memory_order_consume。
在push中,如果内存池无法分配空间,直接返回,但是没有释放已经分配的内存。可以添加一个析构函数,在析构时将所有已分配但未使用的节点释放。
在try_pop中,如果队列为空,则返回false。但是此时可能存在其他线程正在进行push操作,因此需要添加一定的同步机制来保证正确性。
在push操作中,每次都需要从内存池中申请内存,这样会增加一定的开销。可以改为批量分配内存,并且使用一个指针来记录当前可用的内存位置,避免频繁调用malloc/free。
以下是优化后的代码:
#include
template
LockFreeQueue() : head_(new Node), tail_(head_.load()), memory_pool_(sizeof(Node) * 100) {}
~LockFreeQueue()
{
while (Node *const old_head = head_)
{
Node *next_node = old_head->value_.second;
old_head->~Node();
memory_pool_.Free(old_head, sizeof(Node));
head_ = next_node;
}
}
void push(const T &value)
{
Node* new_node = nullptr;
{
std::lock_guard<std::mutex> lock(memory_pool_mutex_);
if (free_list_.empty()) {
const int kBatchSize = 100;
void* addr[kBatchSize];
memory_pool_.Alloc(addr, sizeof(Node) * kBatchSize);
for (int i = 0; i < kBatchSize; ++i) {
free_list_.push_back(static_cast<Node*>(addr[i]));
}
}
new_node = free_list_.back();
free_list_.pop_back();
}
new (new_node) Node(std::make_pair(value, nullptr));
Node *tail = tail_.load(std::memory_order_relaxed);
tail->value_.second = new_node;
tail_.store(new_node, std::memory_order_release);
size_.fetch_add(1, std::memory_order_relaxed);
}
bool try_pop(T &value)
{
Node *const old_head = head_.load(std::memory_order_relaxed);
Node *const new_head = old_head->value_.second;
if (new_head == nullptr)
{
std::lock_guard<std::mutex> lock(memory_pool_mutex_);
if (free_list_.empty()) {
return false;
} else {
return try_pop(value);
}
}
value = new_head->value_.first;
head_.store(new_head, std::memory_order_release);
size_.fetch_sub(1, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> lock(memory_pool_mutex_);
free_list_.push_back(old_head);
}
return true;
}
bool empty() const
{
return size_.load(std::memory_order_acquire) == 0;
}
private:
struct Node
{
std::pair<T, Node *> value_;
Node() : value_(std::make_pair(T(), nullptr)) {}
explicit Node(const std::pair<T, Node *> &value) : value_(value) {}
};
std::atomic<Node *> head_;
std::atomic<Node *> tail_;
std::atomic<size_t> size_{0};
terra_memory_pool memory_pool_;
std::mutex memory_pool_mutex_;
std::vector<Node*> free_list_;
};




