1. 简介
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它实现了浏览器与服务器之间的实时双向数据传输,使得 Web 应用程序能够更加高效地使用网络。Gateway 可以将 WebSocket 消息分发到多微服务集群上,并根据特定条件将消息推送给指定在线用户。
本文将介绍如何使用 Redis 的 Pub/Sub 模式,实现 Gateway 与多微服务集群的 WebSocket 实现,并详细解释每个步骤的使用。
2. 环境准备
本文所需环境如下:
- JDK 1.8
- Maven 3.5.0+
- Spring Boot 2.2.1.RELEASE
- Spring Framework 5.2.1.RELEASE
- Redis 4.0+
3. 创建项目
首先创建一个空的 Maven 项目,并添加以下依赖到 pom.xml 文件中。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
4. 创建 WebSocket 配置类
创建一个 WebSocket 配置类,用于配置 WebSocket 相关的参数。
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private Environment env;
/**
* 注册WebSocketEndpoint实现类,使用SockJS协议传输 websocket协议需要使用SockJS,否则前端无法连接websocket服务。
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
String endpoint = env.getProperty("websocket.endpoint");
String allowedOrigins = env.getProperty("websocket.allowedOrigins");
registry.addEndpoint(endpoint)
.setAllowedOrigins(allowedOrigins)
.withSockJS();
}
/**
* 配置消息代理(Message Broker),
* 使用基于Redis的Pub/Sub模式实现消息的分发与推送
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
String destinationPrefix = env.getProperty("redis.destinationPrefix");
String channelPrefix = env.getProperty("redis.channelPrefix");
registry.enableStompBrokerRelay("/topic")
.setAutoStartup(true)
.setDestinationPrefixes(destinationPrefix)
.setRelayHost(env.getProperty("redis.host"))
.setRelayPort(Integer.parseInt(env.getProperty("redis.port")))
.setClientLogin(env.getProperty("redis.username"))
.setClientPasscode(env.getProperty("redis.password"))
.setSystemLogin(env.getProperty("redis.username"))
.setSystemPasscode(env.getProperty("redis.password"))
.setVirtualHost(env.getProperty("redis.virtualHost"))
.setUserDestinationBroadcast("/topic/unresolved.user.dest")
.setUserRegistryBroadcast("/topic/simp-user-registry")
.setApplicationDestinationPrefixes(channelPrefix);
}
}
在上面的代码中,我们使用 @EnableWebSocketMessageBroker
注解启用了 WebSocket 消息代理功能,并实现了 WebSocketMessageBrokerConfigurer
接口来配置 WebSocket 相关参数。
其中,registerStompEndpoints()
方法用于注册我们的 WebSocket Endpoint,这里使用了 SockJS 协议传输 WebSocket 消息,因为大部分浏览器都支持 SockJS。具体的 WebSocket Endpoint 实现可以参考 Spring Boot 官方文档。
configureMessageBroker()
方法则是用来配置消息代理,其中 destinationPrefixes
属性指定了 Destinations 的前缀,而 applicationDestinationPrefixes
属性则指定了应用程序订阅的频道的前缀。
5. 创建 WebSocket Service
接下来创建一个 WebSocketService 类,用于处理客户端的 WebSocket 连接和消息处理。
@Service
public class WebSocketService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public WebSocketService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
/**
* 处理WebSocket连接请求
*/
@MessageMapping("/websocket/connect")
public void connect(StompHeaderAccessor accessor, Principal principal) {
String sessionId = accessor.getSessionId();
String username = principal.getName();
String destination = "/user/" + username + "/message";
accessor.setHeader("simpSessionId", sessionId);
accessor.setSessionId(sessionId);
// 注册用户到当前会话(Session)中
messagingTemplate.convertAndSendToUser(username, "/queue/connect", sessionId);
messagingTemplate.convertAndSend(destination, new ConnectMessage(username), createHeaders(sessionId));
}
/**
* 处理WebSocket断开连接请求
*/
@MessageMapping("/websocket/disconnect")
public void disconnect(StompHeaderAccessor accessor, Principal principal) {
String sessionId = accessor.getSessionId();
String username = principal.getName();
String destination = "/user/" + username + "/message";
// 从当前会话(Session)中注销用户
messagingTemplate.convertAndSend(destination, new DisconnectMessage(username), createHeaders(sessionId));
}
/**
* 处理WebSocket消息请求
*/
@MessageMapping("/websocket/message")
public void message(StompHeaderAccessor accessor, Principal principal, MessageDTO messageDto) {
String sessionId = accessor.getSessionId();
String username = principal.getName();
String destination = messageDto.getDestination();
// 将消息发送到指定的频道
messagingTemplate.convertAndSend(destination, new TextMessage(username, messageDto.getText()), createHeaders(sessionId));
}
/**
* 根据条件将消息推送给指定在线用户
*/
public void push(String username, MessageDTO messageDto) {
String destination = "/user/" + username + "/message";
messagingTemplate.convertAndSend(destination, new TextMessage(messageDto.getUsername(), messageDto.getText()), createHeaders(null));
}
/**
* 创建消息头
*/
private static MessageHeaders createHeaders(String sessionId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
return headerAccessor.getMessageHeaders();
}
}
在上面的代码中,我们使用 @Service
注解将一个普通类标识为 Spring 的 Service 层组件,并使用 @MessageMapping
注解标记处理 WebSocket 消息的方法。
connect()
方法处理客户端连接请求,在连接成功后向客户端发送连接成功的消息,并将当前用户注册到会话中。
disconnect()
方法处理客户端断开连接请求,在断开连接后向客户端发送断开连接的消息,并将当前用户从会话中注销。
message()
方法处理客户端发送的普通消息,将消息发送到指定的频道。
push()
方法根据条件将消息推送给指定在线用户。
createHeaders()
方法用于创建消息头,其中 SimpMessageHeaderAccessor
是一个协助管理消息头的工具类。
6. 创建 WebSocket Controller
接下来创建一个 WebSocketController 类,用于处理 WebSocket 相关的请求。
@RestController
@RequestMapping("/api/v1")
public class WebSocketController {
@Autowired
private WebSocketService webSocketService;
/**
* 推送消息给指定用户
*/
@PostMapping("/websocket/push/{username}")
public void push(@PathVariable String username, @RequestBody MessageDTO messageDto) {
webSocketService.push(username, messageDto);
}
}
在上面的代码中,我们使用 @RestController
注解将一个普通类标识为 Spring 的 Web 层组件,并使用 @RequestMapping
注解指定请求路径。
push()
方法用于实现推送消息给指定用户的功能,其中 @PathVariable
和 @RequestBody
注解用于获取路径参数和请求体中的数据。
7. 部署应用程序
最后,我们可以使用 Maven 将应用程序打包成可执行的 jar 文件,并部署到服务器上。
$ mvn clean package
$ java -jar target/websocket-0.0.1-SNAPSHOT.jar
8. 测试
现在我们可以使用浏览器或者 WebSocket 客户端连接到 Gateway 上,并进行消息的交互。
var stompClient = null;
function connect() {
// 连接WebSocket服务
var socket = new SockJS('/websocket');
// 创建STOMP客户端实例
stompClient = Stomp.over(socket);
// STOMP客户端设置
stompClient.connect({}, function (frame) {
console.log('Connected: ' + frame);
// 订阅频道
stompClient.subscribe('/user/{username}/message', function (response) {
console.log(response);
});
// 发送连接请求
stompClient.send('/app/websocket/connect', {}, {});
});
}
function disconnect() {
// 关闭STOMP客户端连接
if (stompClient != null) {
stompClient.disconnect();
}
}
function sendMessage() {
// 发送消息
var destination = '/app/websocket/message';
var text = $("#text").val();
var messageDto = {destination: destination, text: text};
stompClient.send(destination, {}, JSON.stringify(messageDto));
}
9. 总结
本文介绍了如何使用 Redis 的 Pub/Sub 模式,实现 Gateway 与多微服务集群的 WebSocket 实现,并详细解释了每个步骤的使用。通过这种方式,我们可以轻松地实现根据条件将消息推送给指定在线用户,并与前端交互数据。