GeXiangDong

精通Java、SQL、Spring的拼写,擅长Linux、Windows的开关机

0%

小程序中使用STOMP over Websocket

小程序提供了 websocket 功能,STOMP 是在websocket之上的一个通讯协议, 使用起来比较简单容易。特别是后段的 spring boot 等支持STOMP over Websocket很简单。

可以通过在小程序段按照以下步骤加入 STOMP,

下载stomp.js 并加入到小程序中

可以从https://stomp-js.github.io/stomp-websocket/codo/extra/docs-src/Usage.md.html#地址下载最新的stomp.js, 这是官方地址,也有stomp.js的文档可供参考。

下载到stomp.js后,放入小程序的 /utils/ 目录下(也可以是其他目录)。

修改app.js,并包装websocket/stomp

javascript大部分方法都是异步,每个方法调用后,都需要在成功的回调函数里进行下一个,例如连接成功的回调函数里,才能发送/订阅stomp消息,否则就出错了,每个页面都这样写就比较麻烦了,因此我推荐在 app.js 里写两个公共函数,分别做发送和订阅功能,函数内处理连接的问题。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
const BASEURL = 'http://localhost:8070/'
const SOCKETURL = 'ws://localhost:8070/socket'


App({
_websocket:{
client: null, // stomp client
subscriptions: [], //订阅都在此保存一份, websocket断掉后重连需要重新订阅
messagesQueue: [], // STOMP 消息队列 stomp连接建立前,调用发送会暂存于此
subscriptionQueue: [], // STOMP 订阅队列 stomp连接建立前,调用订阅会暂存于此
clientAviable: false // client是否可用,不可用包含连接建立前和建立后连接由于网络原因断开在尝试重连成功前
},

/**
* 发送STOMP消息的函数,供外部使用, 调用方式
* getApp().sendStomp('/app/something', message, header)
* message 是Object格式,此方法会转成JSON string
* header 可选,通常不需要
*/
sendStomp: function(dest, msg, header){
if (this._websocket.clientAviable){
// 连接已经建立起来了,直接发送
var res = this._websocket.client.send(dest, header, JSON.stringify(msg))
}else{
// 连接尚未建立,加入到消息队列,待连接成功建立后,自动发送
this._websocket.messagesQueue.push({
destination: dest,
header: header,
body: JSON.stringify(msg)
})
}
},

/**
* 订阅STOMP消息,供外部使用, 调用方式:
* getApp().subscribeStomp('/topic/news', function(){....收到消息后的回调函数...})
*/
subscribeStomp: function (dest, callback, header) {
// 先保存一份订阅;以便连接断掉重连时重新订阅
this._websocket.subscriptions.push({
destination: dest,
header: header,
callback: callback
})
if (this._websocket.clientAviable) {
// 连接已经建立,直接订阅
this._websocket.client.subscribe(dest, callback, header)
} else {
// 连接尚未建立,加入到订阅队列,待连接成功建立后,一起订阅
this._websocket.subscriptionQueue.push({
destination: dest,
header: header,
callback: callback
})
}
},

_sendSocketMessage: function(msg) {
console.log('send msg:', msg);
wx.sendSocketMessage({
data: msg
})
},

/**
* 连接weboscket,在onLunch里会调用,如果不是整个小程序都需要连接,也可在某个页面内调用
* 此方法会首先判断当前是否已经获取到了token,如果没token,会先获取token后连接
*/
connectWebsocket: function(param){
if (this.globalData.token) {
this._connectWebsocketWithToken(param)
} else {
//login
let that = this
this.login({ tokenGot: function () { that._connectWebsocketWithToken(param) } })
}
},

_closeSocket: function(pram){
wx.closeSocket({
success: function(res){},
fail: function(res){}
})
},

_connectSocket: function(){
let that = this
wx.connectSocket({
url: SOCKETURL,
header: {
"Authorization": "Bearer " + that.globalData.token
},
success: function(res){

},
fail:function(res){
}
})
},

_processQueue: function(client){
for (var i = 0; i < this._websocket.subscriptionQueue.length; i++) {
var subs = this._websocket.subscriptionQueue[i]
client.subscribe(subs.destination, subs.callback, subs.header)
}
this._websocket.subscriptionQueue = []
for (var i = 0; i < this._websocket.messagesQueue.length; i++) {
var msg = this._websocket.messagesQueue[i]
client.send(msg.destination, msg.header, msg.body)
}
this._websocket.messagesQueue = []
},

// 获取token后,连接websocket
_connectWebsocketWithToken: function(param){
let that = this
var ws = {
send: this._sendSocketMessage,
close: this._closeSocket,
onopen: null,
onmessage: null
}

this._connectSocket()

wx.onSocketOpen(function (res) {
console.log('websocket连接成功')

if (that._websocket.client) {
// 断掉自动重连的情形
// 首先重新订阅【订阅在重连后会丢失】
that._websocket.client.subscriptions = [] //清空以前的订阅
for (var i = 0; i < that._websocket.subscriptions.length; i++) {
var subs = that._websocket.subscriptions[i]
var result = that._websocket.client.subscribe(subs.destination, subs.callback, subs.header)
console.log('重新订阅', subs, result)
}
that._processQueue(that._websocket.client)
that._websocket.clientAviable = true
}

ws.onopen && ws.onopen()
})

wx.onSocketMessage(function (res) {
console.log('onSocketMessage事件:', res)
ws.onmessage && ws.onmessage(res)
})

//STOMP.js的断掉重连不起作用了,在这里设置断掉重连
wx.onSocketClose(function(res){
console.log('onSocketClose 事件,websocket断掉了:', res)
that._websocket.clientAviable = false
if(that._websocket.client != null){
//自动重连
setTimeout(that._connectSocket, that._websocket.client.reconnect_delay)
}
})

var Stomp = require('./utils/stomp.js').Stomp;

var client = Stomp.over(ws);

var headers = {}
var connectCallback = function (frame) {
console.log('STOMP client connect successed', frame)

that._processQueue(client)

that._websocket.client = client
that._websocket.clientAviable = true

if (param && param.success){
//调用 回调函数
param.success()
}
}

var errorCallback = function (e) {
console.log('STOMP connect error callback ', e)
}
client.debug = function(str){
console.log(str)
}
client.heartbeat.outgoing = 20000 // client will send heartbeats every 20000ms
client.heartbeat.incoming = 20000 // 0 meant disable
client.reconnect_delay = 3000 // stomp.js的断掉重连在这里不工作,这个设置给_connectSocket用
client.connect(headers, connectCallback, errorCallback)

},

/**
* onLaunch 方法会在小程序启动时调用
**/
onLaunch: function () {
let that = this
this._login({
tokenGot: function(){
console.log("login successed.", that.globalData.token)
//连接webosocket
that.connectWebsocket()
// 获取用户信息
wx.getSetting({
success: res => {
if (res.authSetting['scope.userInfo']) {
// 已经授权,可以直接调用 getUserInfo 获取头像昵称,不会弹框
wx.getUserInfo({
success: res => {
// 可以将 res 发送给后台解码出 unionId
that.globalData.userInfo = res.userInfo
}
})
}
}
})

}
})
},

_login: function (param){
let that = this
wx.login({
success(res) {
if (res.code) {
//发起网络请求
wx.request({
url: BASEURL + '/auth/login',
method: 'POST',
data: {
code: res.code
},
success: function (res) {
console.log("after post login data to server", res.data)
that.globalData.token = res.data.token
that.globalData.user = res.data.user
if (param && param.tokenGot) {
console.log("after set token")
param.tokenGot(res.data.token)
}
}
})
} else {
console.log('登录失败!' + res.errMsg)
}
}
})
},

/**
* 重新封装的 request, 请求普通的url,增加token
*/
request: function(param) {
param.url = BASEURL + param.url
if (this.globalData.token) {
this._requestWithToken(param)
} else {
//login
let that = this
this._login({ tokenGot: function () { that._requestWithToken(param) } })
}
},

_requestWithToken: function (param) {
var header = param.header
if (header) {

} else {
header = {}
}
header['Authorization'] = 'Bearer ' + this.globalData.token
param['header'] = header
let successCallback = param.success
var resultHandler = function (res) {
//console.log(param, res)
if (res.statusCode >= 200 && res.statusCode < 300) {
if (successCallback) {
successCallback(res)
}
} else {
console.log("server error", res)
var error = {}
error.code = res.statusCode
if (res.statusCode >= 400 && res.statusCode < 500) {
error.message = "没找到资源,可能已删除"
} else if (res.statusCode >= 500 && res.statusCode < 600) {
error.message = "服务器出错了,请稍后再试"
} else {
error.message = "未知错误"
}
if (param.invalidResponse) {
param.invalidResponse(error, res)
}
}
}
param['success'] = resultHandler
console.log("request " + param.url + " with token " + this.globalData.token)
wx.request(param)
},

globalData: {
token: null
}
})

其他

取消订阅

这个例子没做取消订阅;取消订阅需要传递 header 的 id 参数,用这个 id 来取消订阅;如果没传递此参数,用 stompClient.subuscribe 的返回值内的 id 取消订阅。

setInterval clearInterval

网络上有些文章会在调用stomp时,设置两个方法,代码如下:

1
2
Stomp.setInterval = function () { ... }
Stomp.clearInterval = function () { ... }

这是因为他们使用的是旧版本的Stomp.js, 从官方网站下载的 stomp.js 不再需要这些代码。

后端

如果用spring boot的websocket,需要注意默认spring 禁止了非同源的访问,需要禁止掉,否则小程序端会抱403错误,无法连接。

客户端连接 spring websocket 返回403错误

心跳包需要后端也支持,可以 websocket 的配置中打开,下面例子的 setHeartbeatValue(new long[] {20000, 20000}) 是设置心跳包的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void configureMessageBroker(MessageBrokerRegistry config) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("wss-heartbeat-thread-");
scheduler.initialize();

// 只有用enableSimpleBroker打开的地址前缀才可以在程序中使用,使用没设置enable的前缀时不会出错,但无法传递消息
// 服务端发送,客户端接收
config.enableSimpleBroker("/topic").setHeartbeatValue(new long[] {20000, 20000})
.setTaskScheduler(scheduler);

// @MessageMapping注解的设置的地址的会和这个前缀一起构成客户端需要声明的地址(stompClient.send()方法的第一个参数)
// 客户端发送,服务端接收
config.setApplicationDestinationPrefixes("/app");
}