spring boot中 websocet 默认和 stomp 结合在一起,使用起来比较简单,有些规则汇总整理一下。
websocket 的配置
写一个配置类实现 WebSocketMessageBrokerConfigurer 接口, 例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app"); }
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/socket"); }
}
|
WebSocketMessageBrokerConfigurer 有8个方法,都是 default 方法,因此仅仅实现自己需要的即可。
WebSocketMessageBrokerConfigurer 的官方文档在 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.html
接收客户端发来的消息
客户端发送来的消息,可以通过 Controller 来接收,像 SpringMVC 一样,写个单独的 Controller 接收, 例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Controller public class SocketController {
@MessageMapping("/chat") public void toAll(ChatMessage message, Principal principal) throws Exception { }
}
|
给客户端发送消息
给客户发端发送消息有2种方式:
通过注解 @SendTo 和 @SendToUser
这两个注解使用有些局限,只能在上面 “接收客户端发来的消息” 用的 Controller 上,而且必须是被 @ MessageMapping 注解修饰的方法上,且仅当客户端触发(客户端给对应的地址发消息后,spring 框架调用这个方法,不是程序主动调用这个方法)时才起作用。
@SendTo 和 @SendToUser 注解有个参数,是发送给客户端的地址(客户端订阅消息的地址),发送的消息就是这个方法的返回值;
@SendTouser 是把消息发送给此方法触发时接收到的消息的发送者。也就是说:客户端给服务端发送一个消息,接受消息的是这个方法,这个方法的返回值作为回馈的消息又发回给了这个用户。(如果该用户没有订阅这个地址是不发的)
@SendTo 则是发送给所有用户(订阅了这个地址的所有用户,下同)
代码例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @MessageMapping("/joinchat") @ SendToUser("/topic/message") public ChatMessage joinAndWelcome(ChatMessage message, Principal principal) throws Exception { return new ChatMessage("welcome, " + principal.getName() + "."); }
@MessageMapping("/chat") @SendTo("/topic/message") public ClockMessage oneMessageToAll(ChatMessage message, Principal principal) throws Exception { return new ChatMessage("" + principal.getName() + ":" + message.getMessage() + " "); }
|
使用组件 SimpMessagingTemplate
这个组件由 spring 创建,使用时只需要 Autowire 进来就好了。可以在程序任意地方使用,有发送给一个用户和所有用户的方法。
1 2 3 4 5 6 7 8 9
| @Autowired private SimpMessagingTemplate template;
public void sendMessage(){ this.template.convertAndSend("/topic/message", new ChatMessage("Hello, all!")); convertAndSendToUser("xxx-yyy", "/topic/message", new ChatMessage("Hello, xxx-yyy!")); }
|
websocket 和 Filter
在 spring 中,我们可能使用/注册了一些 Filter (例如 spring 中的 OncePerRequestFilter), 普通的HTTP请求都先经过这些filter,然后交由具体的 controller 处理,filter的作用很多,例如最常见的权限验证。
每个 websocket 请求也会先经过这个filter,每个websocket请求仅仅是在最初连接的时候经过filter,以后每次发消息时,连接是已经建好的,自然也不会再过filter了。
因此如果有权限验证的 filter, websocket 可以和普通 HTTP 请求采用同样的权限验证方法,用同一个cookie或者同一个 Authentication Token 等。这仅仅帮助我们减少些代码,websocket 端还是需要写设置用户的,在下面讲。
设置websocket的用户
websocket中的用户是使用 Principal 类,和 Spring Security 有些差异。 为了获取当前用户并让后续的 controller 知道当前用户是谁,可以这么做
下面是扩充了的 WebSocketMessageBrokerConfigurer 配置,修改部分有注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler()); }
public class MyHandshakeHandler extends DefaultHandshakeHandler { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { } } }
|
需要注意的是 Principal 类型,这个类型只有一个方法 getName(), websocket 中用这个name来唯一识别一个用户,所以不要用用户名字作为这里的name,用用户的ID比较合适。
拒绝某些用户的连接
有些时候,我们希望在 websocket 服务端拒绝某些连接,例如携带不正确的 token 的用户的连接,使用上面的设置当前用户的方法是行不通的,因为 determineUser 方法即使返回 null 也会继续,只是当前用户为null; 可以在这个方法里抛一个异常,但这不优雅,非常不优雅。客户端收到的服务器端错误而连接失败,不是权限不足。
我们可以通过增加一个 HandshakeInterceptor 来解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler()).addInterceptors(new MyHandshakeInterceptor()); } public class MyHandshakeInterceptor implements HandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception { return false; }
@Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
} } }
|
设置心跳包
stomp 协议可以设置心跳包,多长时间给客户端发一个心跳包,多长时间接收客户端一个心跳包。心跳包的作用简单说就是告诉对方“我还活着”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override public void configureMessageBroker(MessageBrokerRegistry config) { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(1); scheduler.setThreadNamePrefix("wss-heartbeat-thread-"); scheduler.initialize();
config.enableSimpleBroker("/topic").setHeartbeatValue(new long[] {10000, 20000}).setTaskScheduler(scheduler);
config.setApplicationDestinationPrefixes("/app"); } }
|
setHeartbeatvalue 的 long[] 数组参数长度是2,第一个是设置多长时间服务端向客户端发送一个心跳包,第二个值是多长时间收到客户端一个心跳包。默认都是0,也就是没有心跳包;但是如果设置了 setTaskScheduler,那么 hearbeatvalue 的默认值则是 10000,10000 (都是10秒钟)。
在程序中获取当前连接的用户
有些时候,我们希望知道当前有哪些用户连接,某个用户是否还在线等,可以通过组件 SimpUserRegistry 来获得,在需要的时候 Autowire 它即可。
例如:
1 2 3 4 5 6 7 8
| @Autowired private SimpUserRegistry userRegistry;
public void isUserOnline(String userId){ return userRegistry.getUsers().contains(userRegistry.getUser(userId)); }
|
userRegistry.getUsers()
返回的是 Set<SimpUser>
类型,这个set包含所有的在线用户。 userRegistry.getUser(String id)
则是通过id(Principal类的name) 来查找当前用户,返回SimpUser类型。
ChannelInterceptor
可以给 websocket 注册多个不同的 ChannelInterceptor 来监视数据的往来,以及连接的建立、断开等,注册 interceptor 在 weboskcet 的配置中进行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { private static final Logger logger = LoggerFactory.getLogger(WebsocketConfig.class);
@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(new MyChannelInterceptorAdapter()); } public class MyChannelInterceptorAdapter implements ChannelInterceptor {
@Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
logger.trace("MyChannelInterceptorAdapter.preSend({}, {}) command is {}", message, channel, (accessor.getCommand() == null ? "command is null" : accessor.getCommand().name()));
return message; } } }
|
ApplicationListener 监听服务器websocket事件
监听服务端的各种websocket相关事件,可以通过实现 ApplicationListener 接口,然后把这个类标记为 Component 即可。
如果仅仅监听某种特定类型的事件,可以给 ApplicationListener 设定范型,标记自己需要监听的事件类型
1 2 3 4 5 6 7 8 9
| @Component public class SubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> { private static final Logger logger = LoggerFactory.getLogger(SubscribeEventListener.class);
@Override public void onApplicationEvent(SessionSubscribeEvent event) { logger.trace("SessionSubscribeEvent {}", event.getSource()); } }
|
可以监听的事件很多,部分整理如下:
父类 |
子类 |
ApplicationContextEvent |
ContextClosedEvent |
|
ContextRefreshedEvent |
|
ContextStartedEvent |
|
ContextStoppedEvent |
AbstractSubProtocolEvent |
SessionConnectedEvent |
|
SessionConnectEvent |
|
SessionDisconnectEvent |
|
SessionSubscribeEvent |
|
SessionUnsubscribeEvent |
同源问题
Spring websocket 默认加入了同源限制,如果客户端和服务端不是同源(协议、主机名、端口全部相同)会被禁止访问。当客户端是 APP、小程序、或不同源的HTML时,这就有问题了,可以通过 StompEndpointRegistry
的 setAllowedOrigins
方法增加允许的源,例如 setAllowedOrigins("http://myhost.com:8081/,https://myhost.com:8082")
等,也可以使用通配符 * 来表示所有。
1 2 3 4 5
| @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler()).setAllowedOrigins("*"); }
|
如果被同源禁止掉,会返回给客户端403