当然可以!下面是一个使用C++和librdkafka库将数据发送到Kafka并消费后写入MySQL数据库的简单示例。
首先,确保你已经安装了以下组件:
- Apache Kafka
- MySQL数据库
- librdkafka库(可在https://github.com/edenhill/librdkafka 下载)
步骤1: 创建Kafka主题
使用以下命令创建一个名为”test_topic”的Kafka主题:
bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
步骤2: 编写生产者代码
#include <iostream>
#include <string>
#include <cstdlib>
#include <librdkafka/rdkafkacpp.h>
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
if (message.err()) {
std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to topic '" << message.topic_name()
<< "' partition " << message.partition() << std::endl;
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
// 生产者配置
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
// 创建生产者实例
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
delete conf;
return 1;
}
ExampleDeliveryReportCb delivery_report_cb;
// 创建消息
std::string message_str = "Hello, Kafka!";
RdKafka::Message *message = RdKafka::Message::create(producer, topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::ProducerImpl::RK_MSG_COPY,
const_cast<char *>(message_str.c_str()),
message_str.size(),
NULL, 0, 0);
// 发送消息
producer->produce(message);
// 等待发送完成
producer->flush(10000);
// 清理资源
delete message;
delete producer;
delete conf;
return 0;
}
步骤3: 编写消费者代码
#include <iostream>
#include <cstdlib>
#include <string>
#include <mysql/mysql.h>
#include <librdkafka/rdkafkacpp.h>
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &msg, void *opaque) {
if (msg.err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received message: " << std::string(static_cast<char *>(msg.payload()), msg.len()) << std::endl;
// 在这里执行将数据写入MySQL数据库的操作,示例代码如下:
MYSQL *connection = mysql_init(NULL);
if (!connection) {
std:cerr << "Failed to initialize MySQL connection" << std::endl;
return;
}
if (!mysql_real_connect(connection, "localhost", "your_username", "your_password", "your_database_name", 0, NULL, 0)) {
std::cerr << "Failed to connect to MySQL database: " << mysql_error(connection) << std::endl;
mysql_close(connection);
return;
}
std::string data(static_cast<char *>(msg.payload()), msg.len());
std::string query = "INSERT INTO your_table (data_column) VALUES ('" + data + "')";
if (mysql_query(connection, query.c_str())) {
std::cerr << "Failed to execute MySQL query: " << mysql_error(connection) << std::endl;
} else {
std::cout << "Data inserted into MySQL database" << std::endl;
}
mysql_close(connection);
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string group_id = "test_group";
std::string topic = "test_topic";
// 消费者配置
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
conf->set("group.id", group_id, errstr);
// 创建消费者实例
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
delete conf;
return 1;
}
ExampleConsumeCb consume_cb;
// 订阅主题
RdKafka::ErrorCode subscribe_err =
consumer->subscribe({topic}, consume_cb);
if (subscribe_err != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to subscribe to topic: "
<< RdKafka::err2str(subscribe_err) << std::endl;
delete consumer;
delete conf;
return 1;
}
// 消费消息
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
consume_cb.consume_cb(*msg, nullptr);
delete msg;
}
// 清理资源
consumer->close();
delete consumer;
delete conf;
return 0;
}
请根据你自己的MySQL数据库配置和librdkafka库安装路径修改相关参数。以上示例演示了一个简单的流程,实际使用中可能需要更多的错误处理、数据转换和性能优化等。记得根据实际需求进行调整和扩展。
内容由零声教学AI助手提供,问题来源于学员提问