要实现并发处理,可以使用异步方式监听多个Redis队列。以下是一个示例伪代码,使用Redis的异步操作API(hiredis中的ae事件循环)实现并发处理:
#include <iostream>
#include <thread>
#include <vector>
#include <hiredis/hiredis.h>
void asyncCallback(redisAsyncContext* context, void* reply, void* data) {
redisReply* message = static_cast<redisReply*>(reply);
if (message->type == REDIS_REPLY_ARRAY && message->elements >= 3) {
std::string receivedQueueName = message->element[1]->str;
std::string data = message->element[2]->str;
// 处理接收到的消息
std::cout << "Received from queue [" << receivedQueueName << "]: " << data << std::endl;
// TODO: 根据业务需求进行相应处理
if (data == "quit") {
// 接收到特定消息(例如"quit"),停止事件循环
aeStop(static_cast<aeEventLoop*>(data));
}
}
}
void listenToQueue(const std::string& queueName) {
redisAsyncContext* context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到Redis服务器
if (context == nullptr || context->err) {
std::cerr << "Failed to connect to Redis: " << (context ? context->errstr : "nullptr") << std::endl;
return;
}
aeEventLoop* loop = aeCreateEventLoop(1024); // 创建事件循环
// 设置回调函数和传递额外数据
redisAsyncCommand(context, asyncCallback, loop, "SUBSCRIBE %s", queueName.c_str());
// 将异步连接添加到事件循环中
redisAeAttach(loop, context);
// 开始事件循环
aeMain(loop);
aeDeleteEventLoop(loop); // 销毁事件循环
}
int main() {
std::vector<std::thread> threads;
// 创建并启动五个线程,分别监听五个Redis队列
threads.emplace_back(std::thread(listenToQueue, "queue1"));
threads.emplace_back(std::thread(listenToQueue, "queue2"));
threads.emplace_back(std::thread(listenToQueue, "queue3"));
threads.emplace_back(std::thread(listenToQueue, "queue4"));
threads.emplace_back(std::thread(listenToQueue, "queue5"));
// 主线程等待所有子线程结束
for (auto& thread : threads) {
thread.join();
}
return 0;
}
通过使用异步操作API,并为每个队列创建独立的异步连接和事件循环,可以实现对多个队列的并发监听和处理。请注意,上述示例代码仅为演示目的,并未考虑错误处理、优雅退出等方面的完整性。实际使用时,请根据你的需求进行适当调整和扩展。
内容由零声教学AI助手提供,问题来源于学员提问