以下是一个简单的示例,展示如何使用gRPC构建C++观察者模式的分布式系统。
在这个示例中,我们假设存在一个名为”Sensor”的主题,它负责传递温度数据给注册的观察者。当温度发生变化时,所有观察者都会收到更新通知。
首先,定义 protobuf 文件(sensor.proto):
syntax = "proto3";
package sensor;
message Temperature {
float value = 1;
}
service SensorService {
rpc RegisterObserver(stream Temperature) returns (stream Temperature);
}
然后生成 C++ 代码库:
protoc -I=. --grpc_out=. --cpp_out=. sensor.proto
接下来,实现服务器端代码(sensor_server.cc):
#include <iostream>
#include <memory>
#include <string>
#include <unordered_set>
#include <grpcpp/grpcpp.h>
#include "sensor.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using sensor::Temperature;
using sensor::SensorService;
class SensorServiceImpl final : public SensorService::Service {
public:
Status RegisterObserver(ServerContext* context, grpc::ServerReader<Temperature>* reader,
grpc::ServerWriter<Temperature>* writer) override {
std::cout << "New observer registered." << std::endl;
while(reader->Read(&temperature_)) {
// 假设温度变化后通知所有观察者
for(auto& observer : observers_) {
observer->Write(temperature_);
}
}
return Status::OK;
}
void RegisterObserver(std::unique_ptr<grpc::ServerReaderWriter<Temperature, Temperature>> observer) {
observers_.insert(std::move(observer));
}
private:
Temperature temperature_;
std::unordered_set<std::unique_ptr<grpc::ServerReaderWriter<Temperature, Temperature>>> observers_;
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
SensorServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
RunServer();
return 0;
}
最后,实现客户端代码(sensor_client.cc):
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "sensor.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReaderWriter;
using grpc::Status;
using sensor::Temperature;
using sensor::SensorService;
class SensorClient {
public:
explicit SensorClient(std::shared_ptr<Channel> channel)
: stub_(SensorService::NewStub(channel)) {}
void RegisterAsObserver() {
ClientContext context;
std::shared_ptr<ClientReaderWriter<Temperature, Temperature>> stream(
stub_->RegisterObserver(&context));
Temperature temperature;
while(true) {
// 假设从传感器获取温度数据并发送到服务器
temperature.set_value(getTemperatureFromSensor());
stream->Write(temperature);
if(!stream->Read(&temperature)) {
break; // 读取失败,退出循环
}
std::cout << "Temperature update received: " << temperature.value() << std::endl;
}
stream->WritesDone();
Status status = stream->Finish();
if(!status.ok()) {
std::cout << "Observer disconnected with error: " << status.error_message() << std::endl;
}
}
private:
float getTemperatureFromSensor() {
// 假设从传感器获取温度数据的实现
// 这里只是返回一个随机数
return rand() % 100;
}
std::unique_ptr<SensorService::Stub> stub_;
};
int main(int argc, char** argv) {
SensorClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()));
client.RegisterAsObserver();
return 0;
}
这个示例演示了如何使用gRPC构建C++观察者模式的分布式系统。服务器端维护一组已注册的观察者(客户端),当温度发生变化时,通知所有观察者并更新温度数据。
请注意,在实际应用中还需要考虑更多因素,如错误处理、容错机制和高可用性等。以上示例仅供参考,并可以根据实际需求进行扩展和修改。
内容由零声教学AI助手提供,问题来源于学员提问