WebSocket 聊天

前言

最近在写交易虎的消息模块,需要进行买家与客服或者卖家与客服之间的即时通信,因为以前看过 OkHttp 的源码,了解其支持 WebSocket,于是就尝试使用 OkHttp 来实现。

基本概念

什么是 WebSocket ,它解决了什么问题?WebSocket 协议是为了解决 Http 协议的无状态、短连接和服务端无法主动给客户端推送数据等问题而开发的协议,其通信基础也是基于 TCP。

WebSocket 协议分为握手和数据传输两部分。

握手

握手基于 Http 协议实现,请求格式如下:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

响应格式如下:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

数据传输

在传输过程中,一条消息有一个或者多个帧组成,帧又分为数据帧以及控制帧,而每一个帧又都包含两个字节的头部信息,结构如下所示:

第一字节 第二字节
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
F
I
N
R
S
V
1
R
S
V
2
R
S
V
3
OPCODE M
A
S
K
LENGTH

FIN 位如果被设置为 1,表示该帧是消息的最后一帧。

RSV1 位为保留位,一般为 0。

RSV2 位为保留位,一般为 0。

RSV3 位为保留位,一般为 0。

OPCODE 占用 4 位。

  1. 0x0 表示一个持续帧。
  2. 0x1 表示一个文本帧。
  3. 0x2 表示一个二进制帧。
  4. 0x3 ~ 0x7 预留给以后的非控制帧。
  5. 0x8 表示一个连接关闭包。
  6. 0x9 ping 表示一个 ping 包。
  7. 0xa pong 表示一个 pong 包。
  8. 0xb ~ 0xf 预留给以后的控制帧。

MASK 位如果被设置为 1,表示数据必须经过掩码处理,数据长度后 4 个字节为掩码,客户端发送的数据必须经过掩码处理,服务端发送的数据必须不经过掩码处理。

LENGTH 占用 7 位,含义需要分成三类:

  1. 如果值为 0 ~ 125 则是数据的真实长度。
  2. 如果值为 126 ,则数据的真实长度由后续 2 个字节记录。
  3. 如果值为 127 ,则数据的真实长度由后续 8 个字节记录。

完整的数据帧格式如下:

 0                   1                   2                   3      
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

注:RFC 原文文档地址 翻译文档地址

基本使用

基本的使用流程为:

  1. 创建 OkHttpClient 实例。
  2. 创建 Request 实例。
  3. 创建 WebSocket 实例。
fun main() {
val client = createClient(5000)
val request = createRequest("ws://im.sdh-test.com?token=xxx")
val webSocket = createWebSocket(client, request, MyWebSocketListener())
}
private fun createClient(timeout: Long): OkHttpClient {
val builder = OkHttpClient.Builder()
.connectTimeout(timeout, TimeUnit.MILLISECONDS)
.readTimeout(timeout, TimeUnit.MILLISECONDS)
.writeTimeout(timeout, TimeUnit.MILLISECONDS)
return builder.build()
}
private fun createRequest(address: String): Request {
return Request.Builder().url(address).build()
}
private fun createWebSocket(
client: OkHttpClient, request: Request, listener: WebSocketListener): WebSocket {
return client.newWebSocket(request, listener)
}
class MyWebSocketListener: WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {}
override fun onMessage(webSocket: WebSocket, text: String) {}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {}
}

以上就是 OkHttp 实现 WebSocket 通信的基本使用方式。下面分析下源码。

源码分析

Request 实例的创建与普通 http 请求没什么不同,只需注意执行 url 方法时会将 ws: 替换为 http: ,将 wss: 替换为 https:。

open fun url(url: String): Builder {
val finalUrl: String = when {
url.startsWith("ws:", ignoreCase = true) -> {
"http:${url.substring(3)}"
}
url.startsWith("wss:", ignoreCase = true) -> {
"https:${url.substring(4)}"
}
else -> url
}
return url(finalUrl.toHttpUrl())
}

OkHttpClient.newWebSocket()

源码如下:

override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
val webSocket = RealWebSocket(
taskRunner = TaskRunner.INSTANCE,
originalRequest = request,
listener = listener,
random = Random(),
pingIntervalMillis = pingIntervalMillis.toLong(),
extensions = null, // Always null for clients.
minimumDeflateSize = minWebSocketMessageToCompress
)
webSocket.connect(this)
return webSocket
}

内部创建一个 RealWebSocket 实例,然后调用其 connect 方法(因此外界不需要手动调用)。

RealWebSocket()

构造器一共有 7 个参数分别为:

  1. taskRunner: TaskRunner 内部包含若干工作线程,多个任务队列共享。
  2. originalRequest: Request 客户端传入的 Request 实例。
  3. listener: WebSocketListener 客户端传入的 WebSocketListener 实例。
  4. random: Random 随机值生成器。
  5. pingIntervalMillis: Long 心跳包发送间隔。
  6. extensions: WebSocketExtensions? 客户端该字段永远为 null。
  7. minimumDeflateSize: Long 最小压缩大小,低于该大小不进行压缩。

newWebSocket 方法默认心跳包间隔为 0 ms (也就是不发送 ping),最小压缩大小为 1024 字节。

接着看看其 init 代码块:

init {
require("GET" == originalRequest.method) {
"Request must be GET: ${originalRequest.method}"
}
this.key = ByteArray(16).apply { random.nextBytes(this) }.toByteString().base64()
}

内部强制要求请求方法为 GET(默认就是 GET),然后创建随机数校验值(创建一个 16 字节随机数数组对其进行 base 64 编码)。

RealWebSocket.connect()

主要逻辑全部在 connect 方法中,源码如下:

fun connect(client: OkHttpClient) {
if (originalRequest.header("Sec-WebSocket-Extensions") != null) {
failWebSocket(ProtocolException(
"Request header not permitted: 'Sec-WebSocket-Extensions'"), null)
return
}
val webSocketClient = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build()
val request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Extensions", "permessage-deflate")
.build()
call = RealCall(webSocketClient, request, forWebSocket = true)
call!!.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
...
}
override fun onFailure(call: Call, e: IOException) {
...
}
})
}

内部首先强制要求不能带请求头 Sec-WebSocket-Extensions 该请求头用于请求扩展,接着基于外界传入新建一个 OkHttpClient 实例,修改协议为 HTTP/1.1,基于外界传入新建一个 Request 实例,添加以下 5 个请求头。

  1. Upgrade: webSocket 表示客户端想要升级协议为 webSocket 。
  2. Connection: Upgrade 是给代理看的,如果最终服务端没有收到该请求头就会当做普通请求来处理。
  3. Sec-WebSocket-Key 用于校验,只有当该字段经过固定算法加密后与响应头 Sec-WebSocket-Accept 返回的内容相同时,连接才会被认可。
  4. Sec-WebSocket-Version 标识了客户端支持的 WebSocket 协议版本目前也只有 13。
  5. Sec-WebSocket-Extensions 可以有多个该请求头,根据偏好决定先后顺序,但是目前 OkHttp 只支持 permessage-deflate ,表示与对等方协商是否进行 deflate 压缩( Java 中提供了 java.util.zip.deflater 以及 java.util.zip.inflater ) 对等方如果支持该压缩,那么就返回响应头 Sec-WebSocket-Extensions: permessage-deflate 如果不支持可以不返回该响应头,后续双方都不进行压缩。

总结下:其中 Upgrade、Connection、Sec-WebSocket-Version、Sec-WebSocket-Key 这 4 个请求头必须存在,并且前三个值都是固定值。

接着和普通 Http 请求一样,都是创建 RealCall ,然后执行 RealCall.enqueue,不过需要注意创建 RealCall 时 forWebSocket = true,这表示不会添加 networkInterceptors。当请求成功后就会执行 onResponse。源码如下:

override fun onResponse(call: Call, response: Response) {
val exchange = response.exchange
val streams: Streams
try {
checkUpgradeSuccess(response, exchange)
streams = exchange!!.newWebSocketStreams()
} catch (e: IOException) {
...
return
}
val extensions = WebSocketExtensions.parse(response.headers)
this@RealWebSocket.extensions = extensions
if (!extensions.isValid()) {
synchronized(this@RealWebSocket) {
messageAndCloseQueue.clear() // Don't transmit any messages.
close(1010, "unexpected Sec-WebSocket-Extensions in response header")
}
}
try {
val name = "$okHttpName WebSocket ${request.url.redact()}"
initReaderAndWriter(name, streams)
listener.onOpen(this@RealWebSocket, response)
loopReader()
} catch (e: Exception) {
failWebSocket(e, null)
}
}
RealWebSocket.checkUpgradeSuccess()

当请求成功后检查下响应,检查协议升级是否成功,源码如下:

internal fun checkUpgradeSuccess(response: Response, exchange: Exchange?) {
if (response.code != 101) {
throw ProtocolException(...)
}
val headerConnection = response.header("Connection")
if (!"Upgrade".equals(headerConnection, ignoreCase = true)) {
throw ProtocolException(...)
}
val headerUpgrade = response.header("Upgrade")
if (!"websocket".equals(headerUpgrade, ignoreCase = true)) {
throw ProtocolException(...)
}
val headerAccept = response.header("Sec-WebSocket-Accept")
val acceptExpected = (key + WebSocketProtocol.ACCEPT_MAGIC).encodeUtf8().sha1().base64()
if (acceptExpected != headerAccept) {
throw ProtocolException(...)
}
if (exchange == null) {
throw ProtocolException(...)
}
}

只有当响应码为 101,含有响应头 Connection: Upgrade、Upgrade: webSocket ,并且响应头 Sec-WebSocket-Accept 的值通过校验,才不会抛出异常,一旦抛出异常那么直接回调 WebSocketListener.onFailure 。

Exchange.newWebSocketStreams()

当通过响应校验后,创建 WebSocketSteam,源码如下:

fun newWebSocketStreams(): RealWebSocket.Streams {
return codec.connection.newWebSocketStreams(this)
}
// RealConnection.kt
internal fun newWebSocketStreams(exchange: Exchange): RealWebSocket.Streams {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
socket.soTimeout = 0
return object : RealWebSocket.Streams(true, source, sink) {
override fun close() {
exchange.bodyComplete<IOException?>(-1L, responseDone = true, requestDone = true, e = null)
}
}
}
abstract class Streams(
val client: Boolean,
val source: BufferedSource,
val sink: BufferedSink
) : Closeable

内部设置 Socket 超时时间为永久后创建了 Stream 实例返回。

WebSocketExtensions.parse()

创建 WebSocketSteams 完毕后,会解析响应头 Sec-WebSocket-Extensions,如果服务端支持压缩,那么该响应头为 permessage-deflate 并带上以下四个选项中若干个选项。

  1. client_max_window_bits
  2. client_no_context_takeover
  3. server_max_window_bits
  4. server_no_context_takeover

这些是一些压缩选项,至此与服务端的握手就已经完成了,下面来看看客户端是如何发消息给服务端的。

RealWebSocket.send()

一共有两个 send 方法供外界调用,参数类型分别为 String 以及 ByteString,分别用于传输文本数据和二进制数据(如图片),源码如下:

override fun send(text: String): Boolean {
return send(text.encodeUtf8(), OPCODE_TEXT)
}
override fun send(bytes: ByteString): Boolean {
return send(bytes, OPCODE_BINARY)
}
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
if (failed || enqueuedClose) return false
if (queueSize + data.size > MAX_QUEUE_SIZE) {
close(CLOSE_CLIENT_GOING_AWAY, null)
return false
}
queueSize += data.size.toLong()
messageAndCloseQueue.add(Message(formatOpcode, data))
runWriter()
return true
}
private fun runWriter() {
taskQueue.schedule(writerTask)
}

由于内部进行了最大队列限制,因此基于 OkHttp 实现的 WebSocket 不能传输大于等于 16 MB 的文件,这也是正常的,因为 ByteString 会将所有数组保存到内存中,如果过大那么可能导致 OOM,因此在消息模块开发过程中,对于图片消息不考虑直接将二进制数据进行传输,而是采用客户端先将消息上传,然后再将图片地址发送给服务端。

private inner class WriterTask : Task("$name writer") {
override fun runOnce(): Long {
if (writeOneFrame()) return 0L
return -1L
}
}
internal fun writeOneFrame(): Boolean {
val message = messageOrClose as Message
writer!!.writeMessageFrame(message.formatOpcode, message.data)
synchronized(this) {
queueSize -= message.data.size.toLong()
}
}

上述代码省略了心跳包,先直接看下 WebSocketWriter 是如何将消息发送出去的。

WebSocketWriter.writeMessageFrame()

WebSocketWriter 用于发送数据给对等方,源码如下:

fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
messageBuffer.write(data)
var b0 = formatOpcode or B0_FLAG_FIN
if (perMessageDeflate && data.size >= minimumDeflateSize) {
val messageDeflater = this.messageDeflater
?: MessageDeflater(noContextTakeover).also { this.messageDeflater = it }
messageDeflater.deflate(messageBuffer)
b0 = b0 or B0_FLAG_RSV1
}
val dataSize = messageBuffer.size
sinkBuffer.writeByte(b0)
var b1 = 0
b1 = b1 or B1_FLAG_MASK
when {
dataSize <= PAYLOAD_BYTE_MAX -> {
b1 = b1 or dataSize.toInt()
sinkBuffer.writeByte(b1)
}
dataSize <= PAYLOAD_SHORT_MAX -> {
b1 = b1 or PAYLOAD_SHORT
sinkBuffer.writeByte(b1)
sinkBuffer.writeShort(dataSize.toInt())
}
else -> {
b1 = b1 or PAYLOAD_LONG
sinkBuffer.writeByte(b1)
sinkBuffer.writeLong(dataSize)
}
}
random.nextBytes(maskKey!!)
sinkBuffer.write(maskKey)
if (dataSize > 0L) {
messageBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(0L)
toggleMask(maskCursor, maskKey)
maskCursor.close()
}
sinkBuffer.write(messageBuffer, dataSize)
sink.emit()
}

大体流程为:

  1. 将需要传入的数据写入 messageBuffer 中。
  2. 拼接第一个字节的头部信息(FIN 、 OPCODE),写入 sinkBuffer 中。可以看出 OkHttp 没有分帧,只发送一帧。
  3. 如果需要压缩,并且当前数据大小大于最小压缩大小,那么先对数据进行压缩,并且将 RSV1 置为 1 ,虽然 该位通常为 0,但是 OkHttp 使用该位表示是压缩消息。
  4. 拼接第二个字节的头部信息(MASK 、 LENGTH), 写入 sinkBuffer 中。其中 MASK 位必定为 1,LENGTH 根据规则指定。
  5. 如果数据长度大于 125 那么填充 2 或 8 个字节用于表示数据长度。
  6. 生成随机 4 位字节数组做为掩码,写入 sinkBuffer 中。
  7. 将数据进行掩码处理(数据按序与 4 字节数组进行按位异或),并写入 sinkBuffer 中。

发送消息逻辑分析完毕了,下面分析下客户端是如何接收消息的。

RealWebSocket.loadReader()

读取消息应该是一个不间断操作,在 RealWebConnect.connect 中当握手完毕后会执行 loopRead 方法。代码如下:

fun loopReader() {
while (receivedCloseCode == -1) {
reader!!.processNextFrame()
}
}

只要对等方没有关闭连接,就会一直尝试读取下一帧,注意该方法运行在子线程,不会阻塞主线程。

WebSocketReader.processNextFrame()

WebSocketReader 用于从对等方读取数据,源码如下:

fun processNextFrame() {
readHeader()
if (isControlFrame) {
readControlFrame()
} else {
readMessageFrame()
}
}
WebSocketRead.readHeader()

根据先前介绍的帧结构,每一个帧都会有两个字节的头部信息,因此首先需要读取头部信息,源码如下:

private fun readHeader() {
if (closed) throw IOException("closed") // 1
val b0: Int
val timeoutBefore = source.timeout().timeoutNanos()
b0 = source.readByte() and 0xff // 2
opcode = b0 and B0_MASK_OPCODE // 3
isFinalFrame = b0 and B0_FLAG_FIN != 0
isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0
if (isControlFrame && !isFinalFrame) { // 4
throw ProtocolException("Control frames must be final.")
}
val reservedFlag1 = b0 and B0_FLAG_RSV1 != 0
when (opcode) { // 5
OPCODE_TEXT, OPCODE_BINARY -> {
if (reservedFlag1) {
if (!perMessageDeflate) throw ProtocolException("Unexpected rsv1 flag")
readingCompressedMessage = true
} else {
readingCompressedMessage = false
}
}
else -> {
if (reservedFlag1) throw ProtocolException("Unexpected rsv1 flag")
}
}
val reservedFlag2 = b0 and B0_FLAG_RSV2 != 0 // 6
if (reservedFlag2) throw ProtocolException("Unexpected rsv2 flag")
val reservedFlag3 = b0 and B0_FLAG_RSV3 != 0
if (reservedFlag3) throw ProtocolException("Unexpected rsv3 flag")
val b1 = source.readByte() and 0xff // 7
val isMasked = b1 and B1_FLAG_MASK != 0
if (isMasked == isClient) {
throw ProtocolException(if (isClient) { // 8
"Server-sent frames must not be masked."
} else {
"Client-sent frames must be masked."
})
}
frameLength = (b1 and B1_MASK_LENGTH).toLong()
if (frameLength == PAYLOAD_SHORT.toLong()) { // 9
frameLength = (source.readShort() and 0xffff).toLong()
} else if (frameLength == PAYLOAD_LONG.toLong()) {
frameLength = source.readLong()
if (frameLength < 0L) {
throw ProtocolException(
"Frame length 0x${frameLength.toHexString()} > 0x7FFFFFFFFFFFFFFF")
}
}
if (isControlFrame && frameLength > PAYLOAD_BYTE_MAX) { // 10
throw ProtocolException("Control frame must be less than ${PAYLOAD_BYTE_MAX}B.")
}
if (isMasked) {
source.readFully(maskKey!!)
}
}
  1. 如果连接已经关闭了,那么抛出异常。

  2. 从输入流中读取一个字节(可能阻塞)。

  3. 从那个字节中获取操作码、是否是最后一帧、是否是控制帧。

  4. 控制帧必须是最后一帧,如果不是抛出异常。

  5. 判断是否是压缩消息,虽然 RSV1 通常为 0,但是 OkHttp 使用该位表示是压缩消息。

  6. 不允许使用 RSV2、RSV3。

  7. 从输入流中读取一个字节(头部第二个字节)。

  8. 服务端发送的帧数据不能经过掩码处理。

  9. 读取该帧数据长度,最多为 Long 的最大值。

  10. 控制帧数据长度必须小于等于 125 字节。

WebSocketReader.readControlFrame()

如果头部信息中操作码为 0x8、0x9、0xa 分别表示连接关闭、ping、pong,那么就会执行该方法,源码如下:

private fun readControlFrame() {
if (frameLength > 0L) {
source.readFully(controlFrameBuffer, frameLength) // 1
if (!isClient) { // 2
controlFrameBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(0)
toggleMask(maskCursor, maskKey!!)
maskCursor.close()
}
}
when (opcode) {
OPCODE_CONTROL_PING -> {
frameCallback.onReadPing(controlFrameBuffer.readByteString()) // 3
}
OPCODE_CONTROL_PONG -> {
frameCallback.onReadPong(controlFrameBuffer.readByteString()) // 4
}
OPCODE_CONTROL_CLOSE -> { // 5
var code = CLOSE_NO_STATUS_CODE
var reason = ""
val bufferSize = controlFrameBuffer.size
if (bufferSize == 1L) {
throw ProtocolException("Malformed close payload length of 1.")
} else if (bufferSize != 0L) {
code = controlFrameBuffer.readShort().toInt()
reason = controlFrameBuffer.readUtf8()
val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
}
frameCallback.onReadClose(code, reason)
closed = true
}
else -> {
throw ProtocolException("Unknown control opcode: " + opcode.toHexString())
}
}
}
  1. 将该帧的数据部分读取进入 controlFrameBuffer 中。
  2. 如果是服务端那么需要对数据进行掩码处理。
  3. 收到了对等方发来的 ping 控制帧,RealWebSocket 会通知 WebSocketWriter 发送 pong 进行回复。
  4. 收到了对等方发来的 pong 控制帧,RealWebSocket 会标志已收到回复,如果在下次发送 ping 前没收到回复,那么就会回调 onFailure。
  5. 收到了对等方发来的 close 控制帧,拆解关闭码以及原因,关闭连接,回调 onClosing、onClosed。
WebSocketReader.readMessageFrame()

如果头部信息中操作码为 0x1、0x2 那么就会执行该方法,源码如下:

private fun readMessageFrame() {
val opcode = this.opcode
if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) {
throw ProtocolException("Unknown opcode: ${opcode.toHexString()}")
}
readMessage() // 1
if (readingCompressedMessage) {
val messageInflater = this.messageInflater
?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
messageInflater.inflate(messageFrameBuffer)
}
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
} else {
frameCallback.onReadMessage(messageFrameBuffer.readByteString())
}
}
private fun readMessage() {
while (true) {
if (closed) throw IOException("closed")
if (frameLength > 0L) {
source.readFully(messageFrameBuffer, frameLength)
if (!isClient) { // 2
messageFrameBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(messageFrameBuffer.size - frameLength)
toggleMask(maskCursor, maskKey!!)
maskCursor.close()
}
}
if (isFinalFrame) break // 4
readUntilNonControlFrame()
if (opcode != OPCODE_CONTINUATION) { // 5
throw ProtocolException("Expected continuation opcode. Got: ${opcode.toHexString()}")
}
}
}
private fun readUntilNonControlFrame() {
while (!closed) {
readHeader()
if (!isControlFrame) {
break
}
readControlFrame() // 3
}
}
  1. 读取完整的消息,由于数据帧可能有若干帧,因此需要不停的读取直到最终帧。
  2. 如果是服务端,还会对每次收到的数据帧进行掩码处理获取到源码。
  3. 在一系列数据帧中间可能还会夹杂一些控制帧(防止心跳超时),读到后立即处理。
  4. 读取最后一帧后退出循环,根据操作码回调 onReadMessage(String / ByteString)。
  5. 非首帧的帧必须是延续帧,也就是操作码必须要是 0x0。

至此数据接收流程也分析完毕了,下面看看连接关闭流程。

RealWebSocket.close()

连接双方都可以关闭 WebSocket 连接,如果客户端主动关闭连接,需要调用该方法,源码如下:

@Synchronized fun close(
code: Int,
reason: String?,
cancelAfterCloseMillis: Long
): Boolean {
validateCloseCode(code)
if (failed || enqueuedClose) return false
enqueuedClose = true
messageAndCloseQueue.add(Close(code, reasonBytes, cancelAfterCloseMillis))
runWriter()
return true
}
override fun onReadClose(code: Int, reason: String) {
var toClose: Streams? = null
var readerToClose: WebSocketReader? = null
var writerToClose: WebSocketWriter? = null
synchronized(this) {
if (enqueuedClose && messageAndCloseQueue.isEmpty()) {
toClose = this.streams
this.streams = null
readerToClose = this.reader
this.reader = null
writerToClose = this.writer
this.writer = null
this.taskQueue.shutdown()
}
}
try {
listener.onClosing(this, code, reason)
if (toClose != null) {
listener.onClosed(this, code, reason)
}
} finally {
toClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}

使用 WebSocketWriter 发送一个关闭控制帧给服务端,当接收到服务端的关闭控制帧后关闭连接回调 onClosing、onClosed。

实例使用

实际使用场景也就是买家与客服以及卖家与客服之间的聊天,主要逻辑也就是进入页面,拉取历史聊天记录,然后连接 WebSocket ,连接成功后用户就可以发送消息了,用户可以发文本以及图片消息,文本消息那么直接传递文本给服务端,图片消息就先上传到七牛云,然后将图片连接发送给服务端就行。如果异常情况导致连接断开,那么尝试几次重连,此外监听网络变化,如果当前网络从无连接到已连接那么也尝试下重连。

0%