前言
最近在写交易虎的消息模块,需要进行买家与客服或者卖家与客服之间的即时通信,因为以前看过 OkHttp 的源码,了解其支持 WebSocket,于是就尝试使用 OkHttp 来实现。
基本概念
什么是 WebSocket ,它解决了什么问题?WebSocket 协议是为了解决 Http 协议的无状态、短连接和服务端无法主动给客户端推送数据等问题而开发的协议,其通信基础也是基于 TCP。
WebSocket 协议分为握手和数据传输两部分。
握手
握手基于 Http 协议实现,请求格式如下:
GET /chat HTTP/1.1 |
响应格式如下:
HTTP/1.1 101 Switching Protocols |
数据传输
在传输过程中,一条消息有一个或者多个帧组成,帧又分为数据帧以及控制帧,而每一个帧又都包含两个字节的头部信息,结构如下所示:
第一字节 | 第二字节 | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
FIN | RSV1 | RSV2 | RSV3 | OPCODE | MASK | LENGTH |
FIN 位如果被设置为 1,表示该帧是消息的最后一帧。
RSV1 位为保留位,一般为 0。
RSV2 位为保留位,一般为 0。
RSV3 位为保留位,一般为 0。
OPCODE 占用 4 位。
- 0x0 表示一个持续帧。
- 0x1 表示一个文本帧。
- 0x2 表示一个二进制帧。
- 0x3 ~ 0x7 预留给以后的非控制帧。
- 0x8 表示一个连接关闭包。
- 0x9 ping 表示一个 ping 包。
- 0xa pong 表示一个 pong 包。
- 0xb ~ 0xf 预留给以后的控制帧。
MASK 位如果被设置为 1,表示数据必须经过掩码处理,数据长度后 4 个字节为掩码,客户端发送的数据必须经过掩码处理,服务端发送的数据必须不经过掩码处理。
LENGTH 占用 7 位,含义需要分成三类:
- 如果值为 0 ~ 125 则是数据的真实长度。
- 如果值为 126 ,则数据的真实长度由后续 2 个字节记录。
- 如果值为 127 ,则数据的真实长度由后续 8 个字节记录。
完整的数据帧格式如下:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
基本使用
基本的使用流程为:
- 创建 OkHttpClient 实例。
- 创建 Request 实例。
- 创建 WebSocket 实例。
fun main() { |
以上就是 OkHttp 实现 WebSocket 通信的基本使用方式。下面分析下源码。
源码分析
Request 实例的创建与普通 http 请求没什么不同,只需注意执行 url 方法时会将 ws: 替换为 http: ,将 wss: 替换为 https:。
open fun url(url: String): Builder { |
OkHttpClient.newWebSocket()
源码如下:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket { |
内部创建一个 RealWebSocket 实例,然后调用其 connect 方法(因此外界不需要手动调用)。
RealWebSocket()
构造器一共有 7 个参数分别为:
- taskRunner: TaskRunner 内部包含若干工作线程,多个任务队列共享。
- originalRequest: Request 客户端传入的 Request 实例。
- listener: WebSocketListener 客户端传入的 WebSocketListener 实例。
- random: Random 随机值生成器。
- pingIntervalMillis: Long 心跳包发送间隔。
- extensions: WebSocketExtensions? 客户端该字段永远为 null。
- minimumDeflateSize: Long 最小压缩大小,低于该大小不进行压缩。
newWebSocket 方法默认心跳包间隔为 0 ms (也就是不发送 ping),最小压缩大小为 1024 字节。
接着看看其 init 代码块:
init { |
内部强制要求请求方法为 GET(默认就是 GET),然后创建随机数校验值(创建一个 16 字节随机数数组对其进行 base 64 编码)。
RealWebSocket.connect()
主要逻辑全部在 connect 方法中,源码如下:
fun connect(client: OkHttpClient) { |
内部首先强制要求不能带请求头 Sec-WebSocket-Extensions 该请求头用于请求扩展,接着基于外界传入新建一个 OkHttpClient 实例,修改协议为 HTTP/1.1,基于外界传入新建一个 Request 实例,添加以下 5 个请求头。
- Upgrade: webSocket 表示客户端想要升级协议为 webSocket 。
- Connection: Upgrade 是给代理看的,如果最终服务端没有收到该请求头就会当做普通请求来处理。
- Sec-WebSocket-Key 用于校验,只有当该字段经过固定算法加密后与响应头 Sec-WebSocket-Accept 返回的内容相同时,连接才会被认可。
- Sec-WebSocket-Version 标识了客户端支持的 WebSocket 协议版本目前也只有 13。
- Sec-WebSocket-Extensions 可以有多个该请求头,根据偏好决定先后顺序,但是目前 OkHttp 只支持 permessage-deflate ,表示与对等方协商是否进行 deflate 压缩( Java 中提供了 java.util.zip.deflater 以及 java.util.zip.inflater ) 对等方如果支持该压缩,那么就返回响应头 Sec-WebSocket-Extensions: permessage-deflate 如果不支持可以不返回该响应头,后续双方都不进行压缩。
总结下:其中 Upgrade、Connection、Sec-WebSocket-Version、Sec-WebSocket-Key 这 4 个请求头必须存在,并且前三个值都是固定值。
接着和普通 Http 请求一样,都是创建 RealCall ,然后执行 RealCall.enqueue,不过需要注意创建 RealCall 时 forWebSocket = true,这表示不会添加 networkInterceptors。当请求成功后就会执行 onResponse。源码如下:
override fun onResponse(call: Call, response: Response) { |
RealWebSocket.checkUpgradeSuccess()
当请求成功后检查下响应,检查协议升级是否成功,源码如下:
internal fun checkUpgradeSuccess(response: Response, exchange: Exchange?) { |
只有当响应码为 101,含有响应头 Connection: Upgrade、Upgrade: webSocket ,并且响应头 Sec-WebSocket-Accept 的值通过校验,才不会抛出异常,一旦抛出异常那么直接回调 WebSocketListener.onFailure 。
Exchange.newWebSocketStreams()
当通过响应校验后,创建 WebSocketSteam,源码如下:
fun newWebSocketStreams(): RealWebSocket.Streams { |
内部设置 Socket 超时时间为永久后创建了 Stream 实例返回。
WebSocketExtensions.parse()
创建 WebSocketSteams 完毕后,会解析响应头 Sec-WebSocket-Extensions,如果服务端支持压缩,那么该响应头为 permessage-deflate 并带上以下四个选项中若干个选项。
- client_max_window_bits
- client_no_context_takeover
- server_max_window_bits
- server_no_context_takeover
这些是一些压缩选项,至此与服务端的握手就已经完成了,下面来看看客户端是如何发消息给服务端的。
RealWebSocket.send()
一共有两个 send 方法供外界调用,参数类型分别为 String 以及 ByteString,分别用于传输文本数据和二进制数据(如图片),源码如下:
override fun send(text: String): Boolean { |
由于内部进行了最大队列限制,因此基于 OkHttp 实现的 WebSocket 不能传输大于等于 16 MB 的文件,这也是正常的,因为 ByteString 会将所有数组保存到内存中,如果过大那么可能导致 OOM,因此在消息模块开发过程中,对于图片消息不考虑直接将二进制数据进行传输,而是采用客户端先将消息上传,然后再将图片地址发送给服务端。
private inner class WriterTask : Task("$name writer") { |
上述代码省略了心跳包,先直接看下 WebSocketWriter 是如何将消息发送出去的。
WebSocketWriter.writeMessageFrame()
WebSocketWriter 用于发送数据给对等方,源码如下:
fun writeMessageFrame(formatOpcode: Int, data: ByteString) { |
大体流程为:
- 将需要传入的数据写入 messageBuffer 中。
- 拼接第一个字节的头部信息(FIN 、 OPCODE),写入 sinkBuffer 中。可以看出 OkHttp 没有分帧,只发送一帧。
- 如果需要压缩,并且当前数据大小大于最小压缩大小,那么先对数据进行压缩,并且将 RSV1 置为 1 ,虽然 该位通常为 0,但是 OkHttp 使用该位表示是压缩消息。
- 拼接第二个字节的头部信息(MASK 、 LENGTH), 写入 sinkBuffer 中。其中 MASK 位必定为 1,LENGTH 根据规则指定。
- 如果数据长度大于 125 那么填充 2 或 8 个字节用于表示数据长度。
- 生成随机 4 位字节数组做为掩码,写入 sinkBuffer 中。
- 将数据进行掩码处理(数据按序与 4 字节数组进行按位异或),并写入 sinkBuffer 中。
发送消息逻辑分析完毕了,下面分析下客户端是如何接收消息的。
RealWebSocket.loadReader()
读取消息应该是一个不间断操作,在 RealWebConnect.connect 中当握手完毕后会执行 loopRead 方法。代码如下:
fun loopReader() { |
只要对等方没有关闭连接,就会一直尝试读取下一帧,注意该方法运行在子线程,不会阻塞主线程。
WebSocketReader.processNextFrame()
WebSocketReader 用于从对等方读取数据,源码如下:
fun processNextFrame() { |
WebSocketRead.readHeader()
根据先前介绍的帧结构,每一个帧都会有两个字节的头部信息,因此首先需要读取头部信息,源码如下:
private fun readHeader() { |
如果连接已经关闭了,那么抛出异常。
从输入流中读取一个字节(可能阻塞)。
从那个字节中获取操作码、是否是最后一帧、是否是控制帧。
控制帧必须是最后一帧,如果不是抛出异常。
判断是否是压缩消息,虽然 RSV1 通常为 0,但是 OkHttp 使用该位表示是压缩消息。
不允许使用 RSV2、RSV3。
从输入流中读取一个字节(头部第二个字节)。
服务端发送的帧数据不能经过掩码处理。
读取该帧数据长度,最多为 Long 的最大值。
控制帧数据长度必须小于等于 125 字节。
WebSocketReader.readControlFrame()
如果头部信息中操作码为 0x8、0x9、0xa 分别表示连接关闭、ping、pong,那么就会执行该方法,源码如下:
private fun readControlFrame() { |
- 将该帧的数据部分读取进入 controlFrameBuffer 中。
- 如果是服务端那么需要对数据进行掩码处理。
- 收到了对等方发来的 ping 控制帧,RealWebSocket 会通知 WebSocketWriter 发送 pong 进行回复。
- 收到了对等方发来的 pong 控制帧,RealWebSocket 会标志已收到回复,如果在下次发送 ping 前没收到回复,那么就会回调 onFailure。
- 收到了对等方发来的 close 控制帧,拆解关闭码以及原因,关闭连接,回调 onClosing、onClosed。
WebSocketReader.readMessageFrame()
如果头部信息中操作码为 0x1、0x2 那么就会执行该方法,源码如下:
private fun readMessageFrame() { |
- 读取完整的消息,由于数据帧可能有若干帧,因此需要不停的读取直到最终帧。
- 如果是服务端,还会对每次收到的数据帧进行掩码处理获取到源码。
- 在一系列数据帧中间可能还会夹杂一些控制帧(防止心跳超时),读到后立即处理。
- 读取最后一帧后退出循环,根据操作码回调 onReadMessage(String / ByteString)。
- 非首帧的帧必须是延续帧,也就是操作码必须要是 0x0。
至此数据接收流程也分析完毕了,下面看看连接关闭流程。
RealWebSocket.close()
连接双方都可以关闭 WebSocket 连接,如果客户端主动关闭连接,需要调用该方法,源码如下:
fun close( |
使用 WebSocketWriter 发送一个关闭控制帧给服务端,当接收到服务端的关闭控制帧后关闭连接回调 onClosing、onClosed。
实例使用
实际使用场景也就是买家与客服以及卖家与客服之间的聊天,主要逻辑也就是进入页面,拉取历史聊天记录,然后连接 WebSocket ,连接成功后用户就可以发送消息了,用户可以发文本以及图片消息,文本消息那么直接传递文本给服务端,图片消息就先上传到七牛云,然后将图片连接发送给服务端就行。如果异常情况导致连接断开,那么尝试几次重连,此外监听网络变化,如果当前网络从无连接到已连接那么也尝试下重连。