协程的难点

  1. 对于Java程序员而言,这是一个新概念。当然协程也不是Kotlin特有的,除了Java之外,有一些语言也有协程。
  2. 概念模糊。不同的语言对协程的实现或衍生都不同。

协程是什么?

协程基于线程,它是轻量级线程。

协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。这里还是需要有点儿操作系统的知识的,我们在 Java 虚拟机上所认识到的线程大多数的实现是映射到内核的线程的,也就是说线程当中的代码逻辑在线程抢到 CPU 的时间片的时候才可以执行,否则就得歇着,当然这对于我们开发者来说是透明的;而经常听到所谓的协程更轻量的意思是,协程并不会映射成内核线程或者其他这么重的资源,它的调度在用户态就可以搞定,任务之间的调度并非抢占式,而是协作式的。

协程的作用

异步逻辑(任务)同步化,杜绝回调地狱。

函数或者一段程序能够挂起(suspend),任务处理完成后,又在挂起处恢复程序执行。

以上在除Kotlin以外的编程语言也有相关的概率。

理解

协程在我学习汇编语言的中断很像,比如保存寄存器信息等等,协程也可以这么理解,反正就是保存上下文。

Coroutine的翻译

Coroutine 中 前缀co指的是cooperation,合作,与单字“协”相同意思,而routine中文意思常规,故作协程。

协程的基础层和业务层

协程的启动

在学习协程的启动之前,我们对线程的启动稍微了解一下吧。

koltin对线程的启动

Kotlin对Java线程的启动。

1
2
3
4
5
6
7
val thread = object : Thread(){
override fun run() {
super.run()
//do what you want to do.
}
}
thread.start()

可以看到这种方式和Java启动线程差不多,还要使用start()方法,没有体现Kotlin的优势。

对于这个问题,Kotlin为我们提供了更好的方式,如下:

乍一看,这是一个高阶函数。我们可以看一下它的源码,很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public fun thread(
start: Boolean = true,
isDaemon: Boolean = false,
contextClassLoader: ClassLoader? = null,
name: String? = null,
priority: Int = -1,
block: () -> Unit
): Thread {
val thread = object : Thread() {
public override fun run() {
block()
}
}
if (isDaemon)
thread.isDaemon = true
if (priority > 0)
thread.priority = priority
if (name != null)
thread.name = name
if (contextClassLoader != null)
thread.contextClassLoader = contextClassLoader
if (start)
thread.start()
return thread
}

可以看到,默认是start=true的,省了再次调用start方法。

kotlin协程的启动

协程的 API 设计其实也与线程简直是一脉相承。

1
2
3
CoroutineScope.launch {
// todo
}

显而言之,又是一个高阶函数,可以看一下构造器源码。

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

通过源码我们可以看到launch函数有三个参数,context是上下文,start是启动模式,block是协程体。协程体可以理解成Thread.run的代码。

这三个参数中,只有启动模式值得一说。

启动模式

根据上面的源码,我们找到了CoroutineStart枚举类:

1
2
3
4
5
6
7
8
public enum class CoroutineStart {
DEFAULT,
LAZY,
@ExperimentalCoroutinesApi
ATOMIC,
@ExperimentalCoroutinesApi
UNDISPATCHED;
}
模式 功能
DEFAULT 立即执行协程体
ATOMIC 立即执行协程体,但在开始运行之前无法取消
UNDISPATCHED 立即在当前线程执行协程体,直到第一个 suspend 调用
LAZY 只有在需要的情况下运行

四个启动模式当中我们最常用的其实是 DEFAULTLAZY

1
2
3
4
5
6
7
val dateFormat = SimpleDateFormat("HH:mm:ss:SSS")

val now = {
dateFormat.format(Date(System.currentTimeMillis()))
}

fun log(msg: Any?) = println("${now()} [${Thread.currentThread().name}] $msg")

DEFAULT

DEFAULT 是饿汉式启动,launch 调用后,会立即进入待调度状态,一旦调度器 OK 就可以开始执行。

1
2
3
4
5
6
7
8
9
suspend fun main() {
log(1)
val job = GlobalScope.launch {
log(2)
}
log(3)
job.join()
log(4)
}

这段程序采用默认的启动模式,由于我们也没有指定调度器,因此调度器也是默认的,在 JVM 上,默认调度器的实现与其他语言的实现类似,它在后台专门会有一些线程处理异步任务,所以上述程序的运行结果可能是:

1
2
3
4
19:51:08:160 [main] 1
19:51:08:603 [main] 3
19:51:08:606 [DefaultDispatcher-worker-1] 2
19:51:08:624 [main] 4

也可能是:

1
2
3
4
20:19:06:367 [main] 1
20:19:06:541 [DefaultDispatcher-worker-1] 2
20:19:06:550 [main] 3
20:19:06:551 [main] 4

这取决于 CPU 对于当前线程与后台线程的调度顺序,不过不要担心,很快你就会发现这个例子当中 2 和 3 的输出顺序其实并没有那么重要。

JVM 上默认调度器的实现也许你已经猜到,没错,就是开了一个线程池,但区区几个线程足以调度成千上万个协程,而且每一个协程都有自己的调用栈,这与纯粹的开线程池去执行异步任务有本质的区别。

LAZY

LAZY 是懒汉式启动,launch 后并不会有任何调度行为,协程体也自然不会进入执行状态,直到我们需要它执行的时候。这其实就有点儿费解了,什么叫我们需要它执行的时候呢?就是需要它的运行结果的时候, launch 调用后会返回一个 Job 实例,对于这种情况,我们可以:

  • 调用 Job.start,主动触发协程的调度执行
  • 调用 Job.join,隐式的触发协程的调度执行

所以这个所谓的”需要“,其实是一个很有趣的措辞,后面你还会看到我们也可以通过 await 来表达对 Deferred 的需要。这个行为与 Thread.join 不一样,后者如果没有启动的话,调用 join 不会有任何作用。

1
2
3
4
5
6
7
log(1)
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
log(2)
}
log(3)
job.start()
log(4)

基于此,对于上面的示例,输出的结果可能是:

1
2
3
4
14:56:28:374 [main] 1
14:56:28:493 [main] 3
14:56:28:511 [main] 4
14:56:28:516 [DefaultDispatcher-worker-1] 2

当然如果你运气够好,也可能出现 2 比 4 在前面的情况。而对于 join,

1
2
3
4
...
log(3)
job.join()
log(4)

因为要等待协程执行完毕,因此输出的结果一定是:

1
2
3
4
14:47:45:963 [main] 1
14:47:46:054 [main] 3
14:47:46:069 [DefaultDispatcher-worker-1] 2
14:47:46:090 [main] 4

ATOMIC

ATOMIC 只有涉及 cancel 的时候才有意义,cancel 本身也是一个值得详细讨论的话题,在这里我们就简单认为 cancel 后协程会被取消掉,也就是不再执行了。那么调用 cancel 的时机不同,结果也是有差异的,例如协程调度之前、开始调度但尚未执行、已经开始执行、执行完毕等等。

为了搞清楚它与 DEFAULT 的区别,我们来看一段例子:

1
2
3
4
5
6
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
log(2)
}
job.cancel()
log(3)

我们创建了协程后立即 cancel,但由于是 ATOMIC 模式,因此协程一定会被调度,因此 1、2、3 一定都会输出,只是 2 和 3 的顺序就难说了。

1
2
3
20:42:42:783 [main] 1
20:42:42:879 [main] 3
20:42:42:879 [DefaultDispatcher-worker-1] 2

对应的,如果是 DEFAULT 模式,在第一次调度该协程时如果 cancel 就已经调用,那么协程就会直接被 cancel 而不会有任何调用,当然也有可能协程开始时尚未被 cancel,那么它就可以正常启动了。所以前面的例子如果改用 DEFAULT 模式,那么 2 有可能会输出,也可能不会。

需要注意的是,cancel 调用一定会将该 job 的状态置为 cancelling,只不过ATOMIC 模式的协程在启动时无视了这一状态。为了证明这一点,我们可以让例子稍微复杂一些:

1
2
3
4
5
6
7
8
9
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
log(2)
delay(1000)
log(3)
}
job.cancel()
log(4)
job.join()

我们在 2 和 3 之间加了一个 delay,delay 会使得协程体的执行被挂起,1000ms 之后再次调度后面的部分,因此 3 会在 2 执行之后 1000ms 时输出。对于 ATOMIC 模式,我们已经讨论过它一定会被启动,实际上在遇到第一个挂起点之前,它的执行是不会停止的,而 delay 是一个 suspend 函数,这时我们的协程迎来了自己的第一个挂起点,恰好 delay 是支持 cancel 的,因此后面的 3 将不会被打印。

我们使用线程的时候,想要让线程里面的任务停止执行也会面临类似的问题,但遗憾的是线程中看上去与 cancel 相近的 stop 接口已经被废弃,因为存在一些安全的问题。不过随着我们不断地深入探讨,你就会发现协程的 cancel 某种意义上更像线程的 interrupt。

UNDISPATCHED

有了前面的基础,UNDISPATCHED 就很容易理解了。协程在这种模式下会直接开始在当前线程下执行,直到第一个挂起点,这听起来有点儿像前面的 ATOMIC,不同之处在于 UNDISPATCHED 不经过任何调度器即开始执行协程体。当然遇到挂起点之后的执行就取决于挂起点本身的逻辑以及上下文当中的调度器了。

1
2
3
4
5
6
7
8
9
log(1)
val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
log(2)
delay(100)
log(3)
}
log(4)
job.join()
log(5)

我们还是以这样一个例子来认识下 UNDISPATCHED 模式,按照我们前面的讨论,协程启动后会立即在当前线程执行,因此 1、2 会连续在同一线程中执行,delay 是挂起点,因此 3 会等 100ms 后再次调度,这时候 4 执行,join 要求等待协程执行完,因此等 3 输出后再执行 5。以下是运行结果:

1
2
3
4
5
22:00:31:693 [main] 1
22:00:31:782 [main @coroutine#1] 2
22:00:31:800 [main] 4
22:00:31:914 [DefaultDispatcher-worker-1 @coroutine#1] 3
22:00:31:916 [DefaultDispatcher-worker-1 @coroutine#1] 5

方括号当中是线程名,我们发现协程执行时会修改线程名来让自己显得颇有存在感。运行结果看上去还有一个细节可能会让人困惑,join 之后的 5 的线程与 3 一样,这是为什么?我们在前面提到我们的示例都运行在 suspend main 函数当中,所以 suspend main 函数会帮我们直接启动一个协程,而我们示例的协程都是它的子协程,所以这里 5 的调度取决于这个最外层的协程的调度规则了。关于协程的调度,我们后面再聊。

协程调度

协程上下文

我们回顾launch函数的源码:

1
2
3
4
5
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
)

launch 函数有三个参数,第一个参数叫 上下文,它的接口类型是 CoroutineContext,通常我们见到的上下文的类型是 CombinedContext 或者 EmptyCoroutineContext,一个表示上下文的组合,另一个表示什么都没有。我们来看下 CoroutineContext 的接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package kotlin.coroutines
@SinceKotlin("1.3")
public interface CoroutineContext {

public operator fun <E : Element> get(key: Key<E>): E?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}


public fun minusKey(key: Key<*>): CoroutineContext


public interface Key<E : Element>


public interface Element : CoroutineContext {

public val key: Key<*>

public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null

public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)

public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}

异常处理

协程取消

协程挂起

序列生成器

协程作用域

挂起和阻塞

协程的调度器

任务泄露

简单描述就是:人死了,钱没花完。。。

解决方案:结构化并发

结构化并发

  1. 取消任务
  2. 追踪任务
  3. 发出错误信号

参考文献

  1. Kotlin中文社区 https://www.jianshu.com/u/a324daa6fa19
  2. https://www.bilibili.com/video/BV1uo4y1y7ZF