GeXiangDong

精通Java、SQL、Spring的拼写,擅长Linux、Windows的开关机

0%

Spring Boot Websocket

spring boot中 websocet 默认和 stomp 结合在一起,使用起来比较简单,有些规则汇总整理一下。

websocket 的配置

写一个配置类实现 WebSocketMessageBrokerConfigurer 接口, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {

// 只有用enableSimpleBroker打开的地址前缀才可以在程序中使用,使用没设置enable的前缀时不会出错,但无法传递消息
// 服务端发送,客户端接收
config.enableSimpleBroker("/topic");

// @MessageMapping注解的设置的地址的会和这个前缀一起构成客户端需要声明的地址(stompClient.send()方法的第一个参数)
// 客户端发送,服务端接收
config.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//客户端调用的URL;
registry.addEndpoint("/socket");
}


}

WebSocketMessageBrokerConfigurer 有8个方法,都是 default 方法,因此仅仅实现自己需要的即可。

WebSocketMessageBrokerConfigurer 的官方文档在 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurer.html

接收客户端发来的消息

客户端发送来的消息,可以通过 Controller 来接收,像 SpringMVC 一样,写个单独的 Controller 接收, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Controller
public class SocketController {


/**
* MessageMapping注解中配置的接收地址和WebScoketConfig中setApplicationDestinationPrefixes()设置的地址前缀
* 一起构成了客户端向服务器端发送消息时使用地址
*/
@MessageMapping("/chat")
public void toAll(ChatMessage message, Principal principal) throws Exception {
// principal 是当前用户; 如果没有设置用户,则为null
// message 是发送过来的消息,可以是POJO,也可以用Map接收
}

}

给客户端发送消息

给客户发端发送消息有2种方式:

通过注解 @SendTo 和 @SendToUser

这两个注解使用有些局限,只能在上面 “接收客户端发来的消息” 用的 Controller 上,而且必须是被 @ MessageMapping 注解修饰的方法上,且仅当客户端触发(客户端给对应的地址发消息后,spring 框架调用这个方法,不是程序主动调用这个方法)时才起作用。

@SendTo 和 @SendToUser 注解有个参数,是发送给客户端的地址(客户端订阅消息的地址),发送的消息就是这个方法的返回值;

@SendTouser 是把消息发送给此方法触发时接收到的消息的发送者。也就是说:客户端给服务端发送一个消息,接受消息的是这个方法,这个方法的返回值作为回馈的消息又发回给了这个用户。(如果该用户没有订阅这个地址是不发的)

@SendTo 则是发送给所有用户(订阅了这个地址的所有用户,下同)

代码例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

@MessageMapping("/joinchat")
@ SendToUser("/topic/message")
public ChatMessage joinAndWelcome(ChatMessage message, Principal principal) throws Exception {
//由于增加了SendToUser注解,返回结果会被convertAndSend给特定用户,调用这个方法发消息的用户principal中的用户
return new ChatMessage("welcome, " + principal.getName() + ".");
}


@MessageMapping("/chat")
@SendTo("/topic/message")
public ClockMessage oneMessageToAll(ChatMessage message, Principal principal) throws Exception {
//发送给所有用户
return new ChatMessage("" + principal.getName() + ":" + message.getMessage() + " ");
}


使用组件 SimpMessagingTemplate

这个组件由 spring 创建,使用时只需要 Autowire 进来就好了。可以在程序任意地方使用,有发送给一个用户和所有用户的方法。

1
2
3
4
5
6
7
8
9
@Autowired
private SimpMessagingTemplate template;

public void sendMessage(){
// 发送给所有用户
this.template.convertAndSend("/topic/message", new ChatMessage("Hello, all!"));
// 发送给一个用户,用户ID 'xxx-yyy'
convertAndSendToUser("xxx-yyy", "/topic/message", new ChatMessage("Hello, xxx-yyy!"));
}

websocket 和 Filter

在 spring 中,我们可能使用/注册了一些 Filter (例如 spring 中的 OncePerRequestFilter), 普通的HTTP请求都先经过这些filter,然后交由具体的 controller 处理,filter的作用很多,例如最常见的权限验证。

每个 websocket 请求也会先经过这个filter,每个websocket请求仅仅是在最初连接的时候经过filter,以后每次发消息时,连接是已经建好的,自然也不会再过filter了。

因此如果有权限验证的 filter, websocket 可以和普通 HTTP 请求采用同样的权限验证方法,用同一个cookie或者同一个 Authentication Token 等。这仅仅帮助我们减少些代码,websocket 端还是需要写设置用户的,在下面讲。

设置websocket的用户

websocket中的用户是使用 Principal 类,和 Spring Security 有些差异。 为了获取当前用户并让后续的 controller 知道当前用户是谁,可以这么做

下面是扩充了的 WebSocketMessageBrokerConfigurer 配置,修改部分有注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 调用setHandshakeHandler方法设置一个自定义的HandShakeHandler
registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler());
}

/**
* 自定义的 HandshakeHandler, 覆盖了determineUser 方法,返回当前用户
*/
public class MyHandshakeHandler extends DefaultHandshakeHandler {

@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// 这里查询当前用户,并返回一个 Principal 类作为当前用户,以后 controller 参数Principal 就是获得这里的返回值
// 获取当前用户可以通过 request 获得cookie/ token 然后查询获得
// 如果项目中也使用了 Springsecurity 也可用 SecurityContextHolder.getContext().getAuthentication() 获取当前登录用户
// 需要把当前用户设置成 Principal 类型,这个接口只有一个方法 getName() 这个name需要是每个用户唯一的,可以使用系统中用户的ID作为此处的name
}
}
}

需要注意的是 Principal 类型,这个类型只有一个方法 getName(), websocket 中用这个name来唯一识别一个用户,所以不要用用户名字作为这里的name,用用户的ID比较合适。

拒绝某些用户的连接

有些时候,我们希望在 websocket 服务端拒绝某些连接,例如携带不正确的 token 的用户的连接,使用上面的设置当前用户的方法是行不通的,因为 determineUser 方法即使返回 null 也会继续,只是当前用户为null; 可以在这个方法里抛一个异常,但这不优雅,非常不优雅。客户端收到的服务器端错误而连接失败,不是权限不足。

我们可以通过增加一个 HandshakeInterceptor 来解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 通过addInterceptors(new MyHandshakeInterceptor()) 登记自定义的 HandshakeInterceptor
registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler()).addInterceptors(new MyHandshakeInterceptor());
}

public class MyHandshakeInterceptor implements HandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
//返回false表示不接受此连接; 返回true表示接受此连接
return false;
}

@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

}
}
}

设置心跳包

stomp 协议可以设置心跳包,多长时间给客户端发一个心跳包,多长时间接收客户端一个心跳包。心跳包的作用简单说就是告诉对方“我还活着”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 要设置心跳包,需要先设置一个 TaskScheduler
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("wss-heartbeat-thread-");
scheduler.initialize();

// 只有用enableSimpleBroker打开的地址前缀才可以在程序中使用,使用没设置enable的前缀时不会出错,但无法传递消息
// 服务端发送,客户端接收
// 通过setHeartbeatValue(new long[] {10000, 20000})设置心跳包频率,单位毫秒
config.enableSimpleBroker("/topic").setHeartbeatValue(new long[] {10000, 20000}).setTaskScheduler(scheduler);

// @MessageMapping注解的设置的地址的会和这个前缀一起构成客户端需要声明的地址(stompClient.send()方法的第一个参数)
// 客户端发送,服务端接收
config.setApplicationDestinationPrefixes("/app");
}
}

setHeartbeatvalue 的 long[] 数组参数长度是2,第一个是设置多长时间服务端向客户端发送一个心跳包,第二个值是多长时间收到客户端一个心跳包。默认都是0,也就是没有心跳包;但是如果设置了 setTaskScheduler,那么 hearbeatvalue 的默认值则是 10000,10000 (都是10秒钟)。

在程序中获取当前连接的用户

有些时候,我们希望知道当前有哪些用户连接,某个用户是否还在线等,可以通过组件 SimpUserRegistry 来获得,在需要的时候 Autowire 它即可。

例如:

1
2
3
4
5
6
7
8

@Autowired
private SimpUserRegistry userRegistry;

public void isUserOnline(String userId){
return userRegistry.getUsers().contains(userRegistry.getUser(userId));
}

userRegistry.getUsers()返回的是 Set<SimpUser> 类型,这个set包含所有的在线用户。 userRegistry.getUser(String id) 则是通过id(Principal类的name) 来查找当前用户,返回SimpUser类型。

ChannelInterceptor

可以给 websocket 注册多个不同的 ChannelInterceptor 来监视数据的往来,以及连接的建立、断开等,注册 interceptor 在 weboskcet 的配置中进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
private static final Logger logger = LoggerFactory.getLogger(WebsocketConfig.class);

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 注册一个自定义的拦截器
registration.interceptors(new MyChannelInterceptorAdapter());
}

public class MyChannelInterceptorAdapter implements ChannelInterceptor {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

logger.trace("MyChannelInterceptorAdapter.preSend({}, {}) command is {}", message, channel,
(accessor.getCommand() == null ? "command is null" : accessor.getCommand().name()));


return message;
}
}
}

ApplicationListener 监听服务器websocket事件

监听服务端的各种websocket相关事件,可以通过实现 ApplicationListener 接口,然后把这个类标记为 Component 即可。

如果仅仅监听某种特定类型的事件,可以给 ApplicationListener 设定范型,标记自己需要监听的事件类型

1
2
3
4
5
6
7
8
9
@Component
public class SubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
private static final Logger logger = LoggerFactory.getLogger(SubscribeEventListener.class);

@Override
public void onApplicationEvent(SessionSubscribeEvent event) {
logger.trace("SessionSubscribeEvent {}", event.getSource());
}
}

可以监听的事件很多,部分整理如下:

父类 子类
ApplicationContextEvent ContextClosedEvent
ContextRefreshedEvent
ContextStartedEvent
ContextStoppedEvent
AbstractSubProtocolEvent SessionConnectedEvent
SessionConnectEvent
SessionDisconnectEvent
SessionSubscribeEvent
SessionUnsubscribeEvent

同源问题

Spring websocket 默认加入了同源限制,如果客户端和服务端不是同源(协议、主机名、端口全部相同)会被禁止访问。当客户端是 APP、小程序、或不同源的HTML时,这就有问题了,可以通过 StompEndpointRegistrysetAllowedOrigins方法增加允许的源,例如 setAllowedOrigins("http://myhost.com:8081/,https://myhost.com:8082")等,也可以使用通配符 * 来表示所有。

1
2
3
4
5

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/socket/challenge").setHandshakeHandler(new MyHandshakeHandler()).setAllowedOrigins("*");
}

如果被同源禁止掉,会返回给客户端403