OkHttp 源码分析

前言

半年前阅读了 Volley 源码,但是现在主流网络请求都是使用 OkHttp + Retrofit + RxJava 甚至 Android 中 HttpUrlConnection 的具体实现都被替换成了 OkHttp,因此打算好好研究下 OkHttp 的源码,本文做为阅读笔记。

注:最近发现 OkHttp 使用 Kotlin 语言进行重写了,于是又重新阅读了一遍源码加深下理解,同时学习下 OkHttp 中 Kotlin 的使用方式,本文基于 v4.9.0 版本 官方仓库地址

简单使用

这里只例举基本的同步及异步 Get 请求,详细的请看 官方文档

val client = OkHttpClient()
fun syncGet(url: String) {
val request = Request.Builder().url(url).build()
val call = client.newCall(request)
val response = call.execute()
if (response.isSuccessful) {
println("Response is successful")
}
}
fun asyncGet(url: String) {
val request = Request.Builder().url(url).build()
val call = client.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {}
override fun onResponse(call: Call, response: Response) {
if (response.isSuccessful) {
println("Response is successful")
}
}
})
}

由上可知,发送一个基本的 get 请求需要如下几步:

  1. 创建 OkHttpClient 实例。
  2. 创建 Request 实例。
  3. 创建 Call 实例。
  4. 执行 Call.execute() 或者 Call.enqueue()

下面按照这四步探索下源码实现。

OkHttpClient 实例的创建

OkHttpClient 实例的创建主要有以下两种方式:

val client1 = OkHttpClient()
val client2 = OkHttpClient.Builder().build()

一种通过调用无参构造器,另一种通过 OkHttpClient.Builder 类构造。当不需要自定义 OkHttpClient 配置的时候可以采用第一种,如果需要自定义配置那么必须采用第二种。接着分别看看上述两种方式的源码实现。

open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory
constructor() : this(Builder())
// OkHttpClient.Builder
fun build(): OkHttpClient = OkHttpClient(this)

可以看到 OkHttpClient 提供了一个主构造器以及一个次构造器,虽然主构造器访问修饰为 internal 没法直接调用,但是上述两种创建 OkHttpClient 实例的方式其实都是调用了主构造器。主构造器对成员变量进行赋值以及执行 init 代码块,代码过多就不展开了,无非就是从 OkHttpClient.Builder 中拷贝对应的参数赋值给其成员变量。下面关注下 OkHttpClient.Builder 的构造器,看看默认的参数。

class Builder constructor() {
// 管理异步请求,限制单主机请求数,以及最大请求数
internal var dispatcher: Dispatcher = Dispatcher()
// 连接池,复用连接,提高网络传输效率
internal var connectionPool: ConnectionPool = ConnectionPool()
// 拦截器列表
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// 网络拦截器列表,一般用于调试
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// 事件监听工厂,默认无,设置后当相应事件触发后会回调如连接开始、连接结束等
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
// 网络连接失败是否进行重试,默认为 true
internal var retryOnConnectionFailure = true
// 认证器,当服务器返回 401 时,会执行其 authenticate 方法,需要用户获取最新凭证后返回一个新的 Request 实例并为其添加 Authorization 请求头,默认不处理认证
internal var authenticator: Authenticator = Authenticator.NONE
// 是否允许重定向
internal var followRedirects = true
// 是否允许协议切换类的重定向,如从 http 转换为 https,当 followRedirects 为 true 时才生效
internal var followSslRedirects = true
// Cookie 管理类,默认不处理 Cookie
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
// 缓存管理类,默认不处理缓存,OkHttp 提供了 Cache 类可以进行磁盘缓存,需要用可以设置
internal var cache: Cache? = null
// DNS,用于将域名转换为对应的 IP 地址列表,默认使用 InetAddress.getAllByName(HostName)
internal var dns: Dns = Dns.SYSTEM
// 代理,默认无
internal var proxy: Proxy? = null
// 代理选择器,默认系统默认
internal var proxySelector: ProxySelector? = null
// 代理认证器,连接代理服务器时会执行器 authenticate 方法,需要用户返回在请求中拼接认证信息
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
// Socket 工厂,默认实现直接创建 Socket 实例
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
// SslSocket 工厂,用于创建 SSLSocket(在 Socket 上又盖了一层),默认无
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
// Https 证书验证器,默认无
internal var x509TrustManagerOrNull: X509TrustManager? = null
// 连接规格,默认包括 MODERN_TLS(用于https)、CLEARTEXT(用于http)
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
// 协议列表,默认包括 Http1.1、Http2.0
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
// 主机名验证器,证书有效校验通过后需要其校验主机名,默认使用单例 OkHostnameVerifier
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
// 证书固定者,用于提高安全性,添加后证书的 Sha256 值必须包括在其内部 pins 中,否则报错,默认无
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
// 证书链清洁工,验证证书链,默认无
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0
// WebSocket 进行 deflate 压缩的最小消息字节数,默认 1k
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
// 路由数据库,存储连接失败的路由,下次不会再选
internal var routeDatabase: RouteDatabase? = null
internal constructor(okHttpClient: OkHttpClient) : this() {...}
}

注:我感觉如果使用 Kotlin 写 OkHttp 就可以去除建造者模式,改为使用默认参数的方式,不过为了考虑兼容 Java 还是得用建造者模式。

可以看到 OkHttpClient.Builder 同样提供了一个主构造器和一个次构造器,次构造器用于拷贝 OkHttpClient ,主构造器执行成员变量的赋值,相关变量的作用及默认值已经做了简单说明。接着来看看第二步 Request 实例的创建。

Request 实例的创建

Request 实例通过 Request.Builder 进行创建的,因此首先看看 Request.Builder 的构造器。

open class Builder {
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}
internal constructor(request: Request) {
...
}
}

可以看到 Request.Builder 同样提供了一个主构造器和一个次构造器,次构造器用于拷贝 Request ,主构造器内部设置了默认请求方法为 GET,并且创建了一个 Headers.Builder 实例用于统一管理请求头。接着看看其 url 方法和 build 方法。

open fun url(url: String): Builder {
// 将 web socket urls 替换成 http urls,便于后续发起协议升级请求
val finalUrl: String = when {
url.startsWith("ws:", ignoreCase = true) -> {
"http:${url.substring(3)}"
}
url.startsWith("wss:", ignoreCase = true) -> {
"https:${url.substring(4)}"
}
else -> url
}
return url(finalUrl.toHttpUrl())
}
open fun url(url: HttpUrl): Builder = apply {
this.url = url
}
open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}

注:Kotlin 中如果方法需要返回当前类对象直接使用 apply 方法包裹方法体

其中 url 方法主要是将请求地址封装成 HttpUrl 实例并赋值给成员 urlbuild 方法创建了 Request 实例。至此第二步结束了接着看看第三步 Call 实例的创建。

Call 实例的创建

通过调用 OkHttpClient 实例的 newCall 方法创建 Call 实例。

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

newCall 方法创建并返回了一个 RealCall 实例,forWebSocket 用于区分是否是 WebSocket 握手请求,这里为 false。至此第三步也结束了看看最后一步 call.execute 以及 call.enqueue 方法是如何进行网络请求的。

Call.execute 和 Call.enqueue

通过上文可知 Call 的具体实现为 RealCall,首先看看相对而言比较简单的同步请求方法 execute。

// RealCall.kt
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("body().close()")
eventListener.callStart(this)
}
// Dispatcher.kt
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}

方法内部执行顺序为:

  1. 校验该 RealCall 是否已经执行过,如果已经执行过就会抛出异常。
  2. 回调 EventListener.onStart 方法。
  3. 执行 Dispatcher.executed 方法,仅仅只把当前 RealCall 实例放入 runningSyncCalls 队列中。
  4. 执行 getResponseWithInterceptorChain 方法。

继续跟踪 getResponseWithInterceptorChain 方法

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

方法内部执行顺序为:

  1. 拼接所有的 Interceptor,包括 OkHttp 内置的 RetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptorCallServerInterceptor 以及用户设置的 interceptorsnetworkInterceptors,其中 networkInterceptors 主要用于网络调试,因为其可以获取到服务端返回的原始数据,可用代码库有 HttpLoggingInterceptorStethoInterceptor
  2. 创建 RealInterceptorChain 实例,并调用其 proceed 方法,获取到 Response 实例返回给外界。

继续跟踪 proceed 方法,看看其是如何获取到响应的。

override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}

proceed 方法实现了 OkHttp 拦截器的链式调用,执行顺序为:

  1. 校验索引是否越界,如果越界抛出异常。
  2. 校验 networkInterceptors 是否修改了主机及端口号。注:exchange 会在 ConnectInterceptor 中赋值。
  3. 校验 networkInterceptors 是否没调用 proceed 方法或者调用了多次。
  4. 拷贝 RealChainInterceptor 实例,并将其 index 属性加 1。注:必须拷贝的原因是可以进行重试
  5. 获取指定 index 处的 Interceptor 实例,执行其 intercept 方法,将拷贝后的 RealChainInterceptor 实例传入。
  6. 校验每个拦截器返回的响应体是否为空。
  7. 将调用 intercept 方法获取到的响应进行返回。

根据上述逻辑,默认 index 为 0,因此会首先执行 RetryAndFollowInterceptor.intercept 方法,并将 index 为 1 的 RealChainInterceptor 实例传入。

RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor 主要负责失败重试,以及重定向。

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
// 每执行一轮,表示进行一次重试
while (true) {
// 创建 ExchangeFinder 实例,第一次一定会创建
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
// 如果已经取消了,抛出异常
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
// 执行下一个 interceptor.intercept 方法
response = realChain.proceed(request)
// 请求成功,但是可能是一个重定向响应
newExchangeFinder = true
} catch (e: RouteException) {
// 尝试路由连接异常,如果可恢复,那么重试,否则抛出异常
if (!recover(...)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// 试图与服务器通信异常,请求可能已经发送,如果可恢复,那么重试,否则抛出异常
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// 如果上个响应存在(表示上个响应是一个重定向响应,其不会拥有响应体),构建一个新的Response实例将上个响应赋值给priorResponse属性
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 根据响应构建下次请求的 Request,如果服务端返回 401,则回调 authenticator.authenticate,并将返回的 Request 实例返回
// 如果服务端返回 301、302 等重定向响应码,则取出 Location 响应头,将其值当做下次请求的 url
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
// 表示不需要重定向和认证,直接返回
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
// 如果限制只发送一次,那也直接返回响应
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// 重定向请求太多了,就抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 赋值新建的请求,保存上一个响应,最终的响应会包含所有前面重定向的响应
request = followUp
priorResponse = response
} finally {
// 出现异常或者进行了重试关闭连接
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
}

大致流程是在一个死循环中,调用传入的 RealChainInterceptor 实例的 proceed 方法获取到响应,如果在获取响应图中发生了异常判断是否需要进行重试,需要则进行下次循环重试,如果成功获取到响应,判断响应码是否为 301、302 等重定向响应码,如是则取出 Location 响应头,将其值当做下次请求的 url,如果响应码为 401,那么执行 Authenticator.authenticate 方法获取新的 Request 实例。

这里也不是真正请求网络的地方,其也是通过 RealChainInterceptor 实例的 proceed 方法获取的响应,不过该 RealChainInterceptor 的 index 字段已经是 1 了,也就是执行索引为 1 的 Interceptor,也就是 BridgeInterceptor

BridgeInterceptor

BridgeInterceptor 做为应用程序代码和网络代码之间的桥梁,其添加了若干个请求头,并会对网络响应进行 gzip 解压。

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
// 如果有请求体,并且请求头如果没有 Content-Type、Content-length
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
// 添加 Host 请求头
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
// 添加 Connection 请求头
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// 添加 Accept-Encoding 请求头,表示接收 gzip 压缩后的响应
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 从 CookieJar 中获取该 url 的 cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
// 如果有 Cookie,那么添加 Cookie 请求头
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
// 添加 UA 请求头
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
// 保存 Cookie 到 CookieJar 中
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 如果进行了 gzip 压缩那么进行解压
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
}

大体上就是添加了一些常用的请求头,Host、Connection、User-Agent、Accept-Encoding、Cookie、Content-Type、Content-Length ,然后处理了下 gzip 压缩。

这里也不是真正请求网络的地方,其也是通过 RealChainInterceptor 实例的 proceed 方法获取的响应,不过该 RealChainInterceptor 的 index 字段已经是 2 了,也就是执行索引为 2 的 Interceptor,也就是 CacheInterceptor

CacheInterceptor

CacheInterceptor 用于从缓存中获取响应和写响应到缓存。

class CacheInterceptor(internal val cache: Cache?) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
// 如果给 OkHttpClient 设置了缓存,那么从里面获取缓存的响应
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 这个里面主要是根据请求和缓存的响应判断缓存是否命中
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
// 客户端设置了 only-if-cached,表示只使用缓存而缓存又没有命中因此直接构建一个 Response 返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 缓存命中,构造一个Response实例并将去掉了body 的 cacheResponse 赋值给该实例的 cacheResponse属性
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// 发生了异常需要将缓存响应体关闭
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 如果有缓存响应并且响应码是 304,就根据返回的响应和缓存的响应构造一个新的响应并且更新下缓存
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
// 响应码不是304,则构造一个新的 Response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
return response
}
}

大体上就是从 Cache 中取出保存的响应,然后根据请求和缓存的响应判断缓存是否命中,命中就会直接构建一个新的响应返回,如果没命中(由于响应过期),则会根据缓存响应的 ETag(对应请求头 if-none-match)、LastModify(对应请求头 if-modified-since) 等响应头去构造当前的请求头,这样当服务器判断资源没变化时可以直接返回 304,框架也只需要更新下缓存的响应头就可以直接返回了。

默认配置的 OkHttpClient 不带任何缓存,但是其提供了一个 Cache 类,如果需要缓存可以进行如下配置

val client = OkHttpClient.Builder().cache(Cache(cacheFile, 50 * 1000)).build()

这里也不是真正请求网络的地方,其也是通过 RealChainInterceptor 实例的 proceed 方法获取的响应,不过该 RealChainInterceptor 的 index 字段已经是 3 了,也就是执行索引为 3 的 Interceptor,也就是 ConnectIntercept

ConnectIntercept

ConnectIntercept 用于与目标主机创建 TCP 连接,并且进行必要的握手,这也是 OkHttp 最难的地方。

object ConnectInterceptor : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}

方法内部主要是调用了 RealCall.initExchange 方法,获取到一个 Exchange 实例,来跟踪下 initExchange

internal fun initExchange(chain: RealInterceptorChain): Exchange {
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
if (canceled) throw IOException("Canceled")
return result
}

方法内部会通过调用 ExchangeFinder.find 获取到一个 ExchangeCodec 实例,然后在用它构建一个 Exchange 实例,这里涉及到以下几个类

  1. ExchangeFinder 用于寻找连接,在 RetryAndFollowUpInterceptor 中创建实例。
  2. ExchangeCodec 用于生成和解析报文。拥有两个子类 Http1ExchangeCodecHttp2ExchangeCodec
  3. Exchange 用于传输单个 HTTP 请求和响应,内部借助 ExchangeCodec 生成和解析报文。
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
throw e
} catch (e: IOException) {
throw RouteException(e)
}
}

方法内部通过调用 findHealthyConnection 寻找可用连接,然后调用 newCodec 返回一个 ExchangeCodec 实例。首先看看 findHealthyConnection

private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
candidate.noNewExchanges()
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}

方法内部是个死循环,又通过调用 findConnection 获取到连接,然后检查是否可用,可用就返回,如果不可用,判断是否有路由还没有尝试,如果全尝试过了就抛出异常,如果还有待尝试的则继续循环。继续跟踪 findConnection

private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
// 如果取消了抛出异常
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection
// 表示是重试请求
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
// 该连接如果不可用,或者再次请求的主机名和端口号与原先的不同,那么需要准备关闭连接
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// 连接仍然可用,那么直接返回
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// 关闭不可用的连接
toClose?.closeQuietly()
// 回调 eventListener.connectionReleased
eventListener.connectionReleased(call, callConnection)
}
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 尝试从连接池中获取一个连接
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
// 回调 eventListener.connectionAcquired
eventListener.connectionAcquired(call, result)
return result
}
... // 下面分析
}

如果请求不是重试或者重定向那么 Call.connectionnull ,那么尝试从连接池中获取一个连接,先看看 callAcquirePooledConnection 是如何获取的。

// RealConnectionPool.kt
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
if (!connection.isEligible(address, routes)) return@synchronized
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}
// RealConnection.kt
fun acquireConnectionNoEvents(connection: RealConnection) {
this.connection = connection
connection.calls.add(CallReference(this, callStackTrace))
}

遍历所有连接,由于首次 requireMultiplexed (是否需要多路复用) 为 false,因此执行 RealConnection.isEligible 判断连接是否可用

  1. 如果可用那么执行 RealCall.acquireConnectionNoEvents 将该连接赋值给 RealCall.connection 字段并返回 true
  2. 如果所有连接都不可用那么返回 false

接下来看看到底是如何判断连接是否可用的

// RealConnection.kt
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
if (calls.size >= allocationLimit || noNewExchanges) return false
if (!this.route.address.equalsNonHost(address)) return false
if (address.url.host == this.route().address.url.host) {
return true
}
if (http2Connection == null) return false
if (routes == null || !routeMatchesAny(routes)) return false
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}
return true
}
internal fun equalsNonHost(that: Address): Boolean {
return this.dns == that.dns &&
this.proxyAuthenticator == that.proxyAuthenticator &&
this.protocols == that.protocols &&
this.connectionSpecs == that.connectionSpecs &&
this.proxySelector == that.proxySelector &&
this.proxy == that.proxy &&
this.sslSocketFactory == that.sslSocketFactory &&
this.hostnameVerifier == that.hostnameVerifier &&
this.certificatePinner == that.certificatePinner &&
this.url.port == that.url.port
}

根据代码一个有用的连接需要满足以下几个条件:

  1. 请求数没有达到上限,Http1 为 1,Http2 为 4。
  2. 连接还可以被使用,还没有关闭。
  3. 本次请求的端口号、支持协议、代理、代理选择器、连接规格(密钥算法套件 + TLS 版本)、主机名校验器等需要与连接的一致。注:使用同一个 OkHttpClient 进行请求也只有主机名和端口号会不同
  4. 本次请求的主机名与连接的目标主机名一致。

对于 Http1 连接必须同时满足上述 4 个条件才能进行复用,对于 Http2 主机名可以不同,但是需要额外满足以下几个条件:

  1. 本次请求的所有路由中包括该连接的路由(有相同的 IP 地址和端口号)。
  2. 该连接上次请求是 Https 请求,并且当前请求的主机名必须在上次请求的服务端证书中。
  3. 通过 CertificatePinner 校验。

下面继续分析 RealConnectionPool.findConnection 的剩余代码

private fun findConnection(...): RealConnection {
... 前面已分析
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
}
else if (routeSelection != null && routeSelection!!.hasNext()) {
routes = null
route = routeSelection!!.next()
} else {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
// 创建 RouteSelector
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
// 从 RouteSelector 中获取 RouteSelection
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// 已经拿到了一系列的 IP 地址,再次尝试从连接池中取出一个连接
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// 获取不到连接,那么自己创建一个连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
// 与目标服务端进行连接
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
// 将成功连接的路由进行记录(只是个内存缓存)
call.client.routeDatabase.connected(newConnection.route())
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
// 将新建的连接放入到连接池中
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
// 回调 eventListener.connectionAcquired
eventListener.connectionAcquired(call, newConnection)
return newConnection
}

首次进入时 nextRouteToTryrouteSelection 都为 false,因此首先会去创建 RouteSelector 实例,然后从中获取到 RouteSelection 实例,再从中获取到路由列表。然后再次执行 callAcquirePooledConnection 传入路由列表获取连接,如果还是获取不到自己创建一个 RealConnection 并进行连接,连接成功后再次执行 callAcquirePooledConnection 获取连接,目的是尽量少的维护连接,假设 A、B 两个 Http2 请求同时发送,且这两个请求目标主机名端口号都一样,A 先创建了连接,将连接放入了连接池,B 也创建了连接,这时候从连接池获取到了 A 创建的连接,那么就会抛弃 B 自己创建的连接,并将当前成功连接的路由赋值给 nextRouteTry,方便下次重试使用。如果获取不到,将本次创建的连接放入连接池然后返回。

三次调用 callAcquirePooledConnection 的目的是:

  1. 获取 Http1 连接
  2. 获取 Http1、Http2 连接
  3. 获取 Http2 连接

看到这里,应该还存在以下两个问题

  1. RouteSelector、RouteSelector.Selection、Route 这三者之间是什么关系,都是做什么用的?
  2. RealConnection 是如何与目标服务器建立 TCP 连接,并进行握手?

首先分析第一个问题:

RouteSelector 构造器会调用 resetNextProxy 用来初始化代理列表

// RouteSelector.kt
private fun resetNextProxy(url: HttpUrl, proxy: Proxy?) {
fun selectProxies(): List<Proxy> {
if (proxy != null) return listOf(proxy)
val uri = url.toUri()
if (uri.host == null) return immutableListOf(Proxy.NO_PROXY)
val proxiesOrNull = address.proxySelector.select(uri)
if (proxiesOrNull.isNullOrEmpty()) return immutableListOf(Proxy.NO_PROXY)
return proxiesOrNull.toImmutableList()
}
eventListener.proxySelectStart(call, url)
proxies = selectProxies()
nextProxyIndex = 0
eventListener.proxySelectEnd(call, url, proxies)
}

如果没有设置 Proxy,那么所有的代理由 ProxySelector 决定。其默认实现为 DefaultProxySelector ,如果没有为系统设置代理,那么返回一个类型为直连的 Proxy 实例,否则返回配置的系统代理。

这里又提出两个疑问

  1. 系统代理是在哪设置的?
  2. 如何才能绕过代理?

对于第一个疑问,应用程序进程启动后,就会通过反射调用 ActivityThread.main 方法,而在该方法中又会调用 attach 方法进而跨进程调用 ActivityManagerService.attachApplication 而在其内部又会跨进度调用 bindApplication 以及 scheduleLaunchActivity,这两个方法都只是发送了一个消息就返回了,等到 ActivityManagerService 执行完毕后,客户端进程执行到 Looper.loop 后,就开始执行第一个消息调用 handleBindApplication 方法,内部有如下代码:

// ActivityThread.java
final IConnectivityManager service = IConnectivityManager.Stub.asInterface(b);
try {
Proxy.setHttpProxySystemProperty(service.getProxyForNetwork(null));
} catch (RemoteException e) {
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
throw e.rethrowFromSystemServer();
}

getProxyForNetwork 中系统会读取网络配置中的代理设置,这个就不展开了。直接看一看 setHttpProxySystemProperty 方法时如何设置代理的。

// Proxy.java
public static final void setHttpProxySystemProperty(ProxyInfo p) {
String host = null;
String port = null;
String exclList = null;
Uri pacFileUrl = Uri.EMPTY;
if (p != null) {
host = p.getHost();
port = Integer.toString(p.getPort());
exclList = p.getExclusionListAsString();
pacFileUrl = p.getPacFileUrl();
}
setHttpProxySystemProperty(host, port, exclList, pacFileUrl);
}
public static final void setHttpProxySystemProperty(String host, String port, String exclList,
Uri pacFileUrl) {
if (exclList != null) exclList = exclList.replace(",", "|");
if (host != null) {
System.setProperty("http.proxyHost", host);
System.setProperty("https.proxyHost", host);
} else {
System.clearProperty("http.proxyHost");
System.clearProperty("https.proxyHost");
}
if (port != null) {
System.setProperty("http.proxyPort", port);
System.setProperty("https.proxyPort", port);
} else {
System.clearProperty("http.proxyPort");
System.clearProperty("https.proxyPort");
}
if (exclList != null) {
System.setProperty("http.nonProxyHosts", exclList);
System.setProperty("https.nonProxyHosts", exclList);
} else {
System.clearProperty("http.nonProxyHosts");
System.clearProperty("https.nonProxyHosts");
}
if (!Uri.EMPTY.equals(pacFileUrl)) {
ProxySelector.setDefault(new PacProxySelector());
} else {
ProxySelector.setDefault(sDefaultProxySelector);
}
}

方法内部逻辑非常简单,解析代理信息,设置或清除以下六个系统属性:

  1. http.proxyHost http请求代理主机
  2. https.proxyHost https请求代理主机
  3. http.proxyPort http请求代理端口
  4. https.proxyPort https请求代理端口
  5. http.nonProxyHosts http请求不进行代理的主机列表
  6. https.nonProxyHosts https请求不进行代理的主机列表

如果存在 PAC 脚本那么设置默认代理选择为 PacProxySelector,如果不存在那么设置 DefaultProxySelector 注:PAC 脚本其实就是一段 JS,指定 URL 对应的代理。

默认代理选择器会读取上述几个参数,如果能找到就组装成一个 Proxy 实例返回,如果找不到那么返回 Proxy.NO_PROXY。至此第一个疑问解决了,再来看看如何绕过代理。其实绕过代理很简单,只需要为 OkHttpClient 设置 Proxy ,代码如下。

val client = OkHttpClient.Builder()
.proxy(Proxy.NO_PROXY)
.build()

继续回到 RouteSelector,通过调用其 next 方法获取到 RouteSelection 实例,看看具体实现:

// RouteSelector.kt
fun next(): Selection {
val routes = mutableListOf<Route>()
while (hasNextProxy()) {
val proxy = nextProxy()
for (inetSocketAddress in inetSocketAddresses) {
val route = Route(address, proxy, inetSocketAddress)
if (routeDatabase.shouldPostpone(route)) {
postponedRoutes += route
} else {
routes += route
}
}
if (routes.isNotEmpty()) {
break
}
}
if (routes.isEmpty()) {
routes += postponedRoutes
postponedRoutes.clear()
}
return Selection(routes)
}
private fun nextProxy(): Proxy {
val result = proxies[nextProxyIndex++]
resetNextInetSocketAddress(result)
return result
}
private fun resetNextInetSocketAddress(proxy: Proxy) {
val mutableInetSocketAddresses = mutableListOf<InetSocketAddress>()
inetSocketAddresses = mutableInetSocketAddresses
val socketHost: String
val socketPort: Int
if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
socketHost = address.url.host
socketPort = address.url.port
} else {
val proxyAddress = proxy.address()
socketHost = proxyAddress.socketHost
socketPort = proxyAddress.port
}
eventListener.dnsStart(call, socketHost)
val addresses = address.dns.lookup(socketHost)
if (addresses.isEmpty()) {
throw UnknownHostException("${address.dns} returned no addresses for $socketHost")
}
eventListener.dnsEnd(call, socketHost, addresses)
for (inetAddress in addresses) {
mutableInetSocketAddresses += InetSocketAddress(inetAddress, socketPort)
}
}

方法内部首先会获取当前索引处的代理,然后访问 DNS 服务器获取该代理地址对应的 IP 地址列表,并将其放入 inetSocketAddresses 成员变量中,接着为每一个 inetSocketAddress 创建一个 Route 实例,如果该路由已经在数据库的失败路由中,那么将其放入 postponedRoutes 列表中,否则将其放入 routes 列表中。当该代理的所有路由都处理完毕后,将 postponedRoutes 拼接到 routes 后面,然后构建一个 Selection 实例然后。

到这里为止 RouteSelectorRouteSelector.SelectionRoute 这三者的作用已经很明了,

RouteSelector 获取每个代理对应的路由并以 Selection 的形式返回。

维护了一个代理下所有的路由信息。

```Route``` 单个路由,包括目标 IP 地址及端口号。

接着看看第二个问题,到底 `RealConnection` 是如何建立连接的。

```kotlin
fun connect(...) {
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
if (route.address.sslSocketFactory == null) {
if (ConnectionSpec.CLEARTEXT !in connectionSpecs) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication not enabled for client"))
}
val host = route.address.url.host
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw RouteException(UnknownServiceException(
"CLEARTEXT communication to $host not permitted by network security policy"))
}
} else {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
throw RouteException(UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"))
}
}
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
break
}
} else {
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
...
eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
if (routeException == null) {
routeException = RouteException(e)
} else {
routeException.addConnectException(e)
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
}

首先判断是否是 Http 请求(虽然 OkHttpClientsslSocketFactory 不做特殊配置一定不为空,但是如果是 Http 请求 Address 中的 sslSocketFactory 就为空)

  1. 如果是 Http 请求,并且明文传输规格不在连接规格列表中或者应用不允许明文传输则抛出异常。
  2. 如果是 Https 请求,并且支持的协议列表中含有 Protocol.H2_PRIOR_KNOWLEDGE (表示预先知道服务端支持明文 Http2 协议)则抛出异常。

最终调用 connectSocket 建立连接,建立完毕后,接着调用 establishProtocol 进行协议的处理。

private fun establishProtocol(...) {
if (route.address.sslSocketFactory == null) {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
socket = rawSocket
protocol = Protocol.H2_PRIOR_KNOWLEDGE
startHttp2(pingIntervalMillis)
return
}
socket = rawSocket
protocol = Protocol.HTTP_1_1
return
}
eventListener.secureConnectStart(call)
connectTls(connectionSpecSelector)
eventListener.secureConnectEnd(call, handshake)
if (protocol === Protocol.HTTP_2) {
startHttp2(pingIntervalMillis)
}
}

如果进行明文传输并且协议为 Http2,那么执行 startHttp2 启动。

如果不进行明文传输执行 connectTls 进行握手,如果协议是 Http2 接着执行 startHttp2 启动。

private fun startHttp2(pingIntervalMillis: Int) {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
socket.soTimeout = 0
val http2Connection = Http2Connection.Builder(client = true, taskRunner = TaskRunner.INSTANCE)
.socket(socket, route.address.url.host, source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build()
this.http2Connection = http2Connection
this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
http2Connection.start()
}
fun start(sendConnectionPreface: Boolean = true, taskRunner: TaskRunner = TaskRunner.INSTANCE) {
if (sendConnectionPreface) {
writer.connectionPreface()
writer.settings(okHttpSettings)
val windowSize = okHttpSettings.initialWindowSize
if (windowSize != DEFAULT_INITIAL_WINDOW_SIZE) {
writer.windowUpdate(0, (windowSize - DEFAULT_INITIAL_WINDOW_SIZE).toLong())
}
}
taskRunner.newQueue().execute(name = connectionName, block = readerRunnable)
}

大致上就是发送了 connectionPreface 以及 setting至于更详细的有待后续理解了 Http2 原理后再来补充

private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
val address = route.address
val sslSocketFactory = address.sslSocketFactory
var success = false
var sslSocket: SSLSocket? = null
try {
sslSocket = sslSocketFactory!!.createSocket(
rawSocket, address.url.host, address.url.port, true /* autoClose */) as SSLSocket
val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
if (connectionSpec.supportsTlsExtensions) {
Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
}
sslSocket.startHandshake()
val sslSocketSession = sslSocket.session
val unverifiedHandshake = sslSocketSession.handshake()
if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) {
val peerCertificates = unverifiedHandshake.peerCertificates
if (peerCertificates.isNotEmpty()) {
val cert = peerCertificates[0] as X509Certificate
throw SSLPeerUnverifiedException(...)
} else {
throw SSLPeerUnverifiedException(...)
}
}
val certificatePinner = address.certificatePinner!!
handshake = Handshake(unverifiedHandshake.tlsVersion, unverifiedHandshake.cipherSuite,
unverifiedHandshake.localCertificates) {
certificatePinner.certificateChainCleaner!!.clean(unverifiedHandshake.peerCertificates,
address.url.host)
}
certificatePinner.check(address.url.host) {
handshake!!.peerCertificates.map { it as X509Certificate }
}
val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
Platform.get().getSelectedProtocol(sslSocket)
} else {
null
}
socket = sslSocket
source = sslSocket.source().buffer()
sink = sslSocket.sink().buffer()
protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1
success = true
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket)
}
if (!success) {
sslSocket?.closeQuietly()
}
}
}
  1. 创建 SSLSocket。
  2. 获取 ConnectionSpec。
  3. 配置 TLS 扩展信息。
  4. 进行握手这个过程会校验证书的合法性,但是没校验证书是否是本次请求期望的。
  5. 验证当前请求主机名是否在服务器证书备用名称列表中(Subject Alternative Name)。

CallServerInterceptor

接着再看看 CallServerInterceptor。

public final class CallServerInterceptor implements Interceptor {
private final boolean forWebSocket;
public CallServerInterceptor(boolean forWebSocket) {
this.forWebSocket = forWebSocket;
}
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();
// 根据Request生成对应的字节数组并且写入到Buffer中
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
// 如果请求包含请求体,写入请求体
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (request.body().isDuplex()) {
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
request.body().writeTo(bufferedRequestBody);
} else {
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}
if (request.body() == null || !request.body().isDuplex()) {
// 将Buffer中的数据写给服务端
exchange.finishRequest();
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}
if (responseBuilder == null) {
// 获取响应头
responseBuilder = exchange.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}
exchange.responseHeadersEnd(response);
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
}

CallServerInterceptor 真正的进行了网络请求,会根据 Request 实例构建出 Http 请求,获取到 Http 响应后再构建出 HttpResponse,网络请求成功后会接着执行前几个 Interceptor 的剩余代码,这里就不看了。直接回到RealCall.execute。

@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}

可以看出当一次同步请求结束后,会将 RealCall 中队列中移除,然后启动正在等待的异步请求,如果没有异步请求会回调 IdleCallback 。接着看看异步请求过程。

Call.enqueue

enqueue 方法用于执行异步请求。

public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) {
throw new IllegalStateException("Already Executed");
}
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

这里都和 execute 一样只是最后调用了 Dispatcher 的 enqueue 方法,不过传入的是 AsyncCall 实例。

void enqueue(AsyncCall call) {
synchronized (this) {
// 将call加入到准备队列中去
readyAsyncCalls.add(call);
if (!call.get().forWebSocket) {
// 刚刚创建的call不是使用webSocket所以进入这里
AsyncCall existingCall = findExistingCallWithHost(call.host());
// 目的只是为了统计每个Host有几个AsyncCall
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
// 从正在执行或者等待执行的call队列中取出host属性为host的AsyncCall实例
private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
// 如果已经达到最大请求数64就停止执行
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
// 如果每个Host达到了最大请求数5个就跳过该call
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
// 每个端口请求计数加1
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
// 创建一个线程池,然后执行AsyncCall的execute方法
asyncCall.executeOn(executorService());
}
return isRunning;
}
final class AsyncCall extends NamedRunnable {
...
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
...
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
@Override
protected void execute() {
// 在线程池中执行
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 端口请求计数减1
client.dispatcher().finished(this);
}
}
}

如果调用了 enqueue 发送网络请求,那么最终会在线程池中执行 AsyncCall 的 execute 方法,其内部实现与同步执行基本类似,注意最后会在子线程中直接调用 onResponse ,因此我们不能在 onResponse 里面直接更新 UI 。我们可以写一个 WrapCall 将 Call 进行包装这样就能实现回调在主线程了,代码如下。

class WrapCall(private val call: Call) : Call by call {
override fun enqueue(responseCallback: Callback) {
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
handler.post {
responseCallback.onFailure(call, e)
}
}
override fun onResponse(call: Call, response: Response) {
handler.post {
responseCallback.onResponse(call, response)
}
}
})
}
// 保证 handler 只会创建一次
companion object {
private val handler = Handler(Looper.getMainLooper())
}
}
// 外界使用,只要包装下
val call = WrapCall(client.newCall(request))

源码分析到这网络流程基本已经清晰,下面再来看看 OkHttp 的连接复用。

ConnectionPoll

首先需要连接复用需要设置请求头 Connection: Keep-Alive ,这个已经在 BridgeInterceptor 里面设置了,当然如果响应头 Connection: false ,那么连接还是不能复用,连接池的具体实现是 RealConnectionPool ,每次在 ConnectInterceptor 的 intercepte 方法都会尝试着先从连接池中取出一个连接,取不到满足条件的才会新建一个连接。

class RealConnectionPool(
taskRunner: TaskRunner,
private val maxIdleConnections: Int, // 最大空闲连接数 默认5个
keepAliveDuration: Long, // 最大保存存活的空闲连接时间 默认5分钟
timeUnit: TimeUnit
) {
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
private val cleanupQueue: TaskQueue = taskRunner.newQueue()
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
override fun runOnce() = cleanup(System.nanoTime())
}
private val connections = ConcurrentLinkedQueue<RealConnection>() // 双端队列存储所有的连接
// 获取当前空闲连接的数量
fun idleConnectionCount(): Int {
return connections.count {
synchronized(it) { it.calls.isEmpty() }
}
}
// 获取当前所有连接的数量
fun connectionCount(): Int {
return connections.size
}

// 尝试从连接池中获取一个连接,如果 routes 不为空,那么可以获取到 http2.0 连接,否则只能获取到 http1.0 连接
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
for (connection in connections) {
synchronized(connection) {
if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
if (!connection.isEligible(address, routes)) return@synchronized // 判断合法性
call.acquireConnectionNoEvents(connection)
return true
}
}
return false
}

// 放入一个连接到连接池中
fun put(connection: RealConnection) {
connection.assertThreadHoldsLock()
connections.add(connection)
cleanupQueue.schedule(cleanupTask)
}

// 连接空闲视情况决定是否把它移除
fun connectionBecameIdle(connection: RealConnection): Boolean {
connection.assertThreadHoldsLock()
return if (connection.noNewExchanges || maxIdleConnections == 0) {
connection.noNewExchanges = true
connections.remove(connection)
if (connections.isEmpty()) cleanupQueue.cancelAll()
true
} else {
cleanupQueue.schedule(cleanupTask)
false
}
}

// 移除并关闭所有的连接
fun evictAll() {
val i = connections.iterator()
while (i.hasNext()) {
val connection = i.next()
val socketToClose = synchronized(connection) {
if (connection.calls.isEmpty()) {
i.remove()
connection.noNewExchanges = true
return@synchronized connection.socket()
} else {
return@synchronized null
}
}
socketToClose?.closeQuietly()
}
if (connections.isEmpty()) cleanupQueue.cancelAll()
}

// 清理连接防止超出最大空闲连接数,以及最大空闲时间
fun cleanup(now: Long): Long {
...
}
companion object {
fun get(connectionPool: ConnectionPool): RealConnectionPool = connectionPool.delegate
}
}

总结

不管是同步请求还是异步请求都是通过 Dispatcher 类进行分发,然后经过从上到下5个 Interceptor 才能发起请求,获取到响应后还会经过这5个拦截器然后才将结果返回到外界,典型的责任链模式与 Android 事件分发差不多 。因此 OkHttp 核心就是 Dispatcher、Interceptor。

Dispatcher:

内部维护了三个双端队列(同步请求队列、异步请求队列、异步准备队列)、一个线程池(同 CacheThreadPoll )。

  1. 执行同步调用时加入到同步请求队列,请求完毕后移除,然后看看异步准备队列是否为空,不为空就请求。
  2. 执行异步调用时判断是否达到最大请求数量,以及最大每个 Host 请求数量,如果都不满,那么加入异步请求队列,如果某个满了,那么加入异步准备队列,当执行完毕后异步准备队列是否为空,不为空就请求。

Interceptor:

  1. RetryAndFollowUpInterceptor 用于错误重试,以及重定向。
  2. BridgeInterceptor 用于添加请求头(User-Agent、Connection等等),收到响应的时候可能会进行 GZip 解压。
  3. CacheInterceptor 进行缓存管理,默认不带缓存,如果需要缓存可以给 OkHttpClient 设置 cache 属性,可以使用 OkHttp 内置的 Cache 类。
  4. ConnectInterceptor 进行连接,首先从连接池中取出可以复用的连接,取不到就新建一个然后通过InetAddress 获取到域名对于的IP地址,然后创建 Socket 与服务端进行连接,连接成功后如果是Https请求还会进行握手验证证书操作。
  5. CallServerInterceptor 用于真正的发起请求,从 Socket 获取的输出流写入请求数据,从输入流中读取到响应数据。
0%