开始
对于 Kotlin,协程就是一个线程框架,将一段代码以挂起的方式运行在后台,这段代码称为协程
suspend 关键字:表示该函数是可挂起的,称为挂起函数,该函数需要在直接或间接协程内调用,因此一个挂起函数只能在协程或另一个挂起函数中调用
添加核心库和平台库
1 | // 公共API |
基本协程
创建协程
协程需要一个作用域对象创建协程,协程的作用域对象为 CoroutineScope,需要一个 CoroutineContext 参数,通过三个顶层函数获取 CoroutineScope
- runBlocking:默认为 EmptyCoroutineContext,传入 CoroutineScope 接收者闭包,可以使用 CoroutineScope 中的函数,会使当前线程阻塞
- GlobalScope:全局单例对象,全局存在
- CoroutineScope:传入 CoroutineContext 构造
launch 函数
调用 CoroutineScope 的 launch 函数开启一个协程,传入 CoroutineScope 接收者闭包,其中进行协程操作,该闭包是 suspend 的,没有返回值
launch 函数定义
1 | public fun CoroutineScope.launch( |
launch 函数中可以调用挂起函数也可以调用非挂起函数
将 Dispatcher 对象的 Main 字段、IO 字段等设置到 context 参数可以指定协程在哪个线程运行
1 | launch(Dispatcher.Main) { |
launch 函数返回一个 Job 对象,表示一个协程任务,可以通过该对象获取协程信息、控制等待等
- join:suspend 修饰,调用者协程等待该协程直到该协程完成
- cancel:取消该协程,同时递归地取消该协程调用的所有子协程
- cancelAndJoin:cancel 和 join 的连续调用
Job 是协程任务的抽象,表示该协程执行的任务,每个协程都有一个 Job 对象,当创建子协程时,子协程会继承父协程的 Job 对象,形成一个协程树,当父协程的 Job 被取消时,它的所有子协程也会被取消
协程取消
所有 kotlinx.coroutines
中的挂起函数都检查了取消行为,在取消时抛出 CancellationException,这些函数都是可取消的,若挂起函数中没有检查取消行为,则是不可取消的,会一直执行直到完成
cancel()
函数只是向协程发送一个取消信号,实际上协程并不会立即停止执行,而是需要在协程内部进行判断并处理取消请求
有两种方式可以使挂起函数可被取消
-
yield
yield 函数会暂停当前协程,允许同一调度器的协程的运行,当当前协程被取消时,调用 yield 函数会抛出 CancellationException,可以捕获该异常进行取消行为
-
isActive
在协程中可以使用 isActive 属性,当协程被取消时,isActive 为 false
当挂起函数对取消请求进行处理时需要抛出 CancellationException,外部进行捕获,可以在 finally 中进行关闭资源操作
不可取消的协程:使用 withContext 函数,传入 NonCancellable 对象作为 Context,开启一个不可取消的协程
协程超时
使用 withTimeout 函数开启一个超时协程,指定超时时间,协程在超时后会抛出 TimeoutCancellationException
使用 withTimeoutOrNull 开启超时协程,在超时后,withTimeoutOrNull 判断超时产生的异常属于协程是否为当前协程,若是当前协程,则返回 null,不抛出异常
超时协程是异步的,应该在捕获 TimeoutCancellationException 后进行资源关闭
协程挂起
协程的挂起和恢复是由 CoroutineDispatcher 调度器来实现,一个协程中的挂起函数称为挂起点,当协程遇到一个挂起点时,会保存协程当前状态,将控制权交回给调度器,协程则进入等待队列,当调度器恢复协程时,协程会从之前的状态继续执行
async 并发
基本使用
用 async 函数也可以创建一个协程,并且有返回值,相当于有返回值的 launch 函数,同样传入 context,start,block 三个参数
async 函数返回 Deferred 对象,表示一个在将来会返回结果的 promise,调用它的 await 函数,在协程结束后返回值
1 | launch { |
Deferred 继承了 Job 接口,同样可以进行取消和等待
惰性 Async
设置 async 函数的 start 参数为 CoroutineStart.LAZY,在调用 Deferred 的 await 函数或调用 Job 的 start 函数后才启动协程
若多个 Deferred不在协程中调用,则 async 协程将顺序执行
若多个 Deferred 中抛出了异常,则所有 async 协程和父协程都会被取消
CoroutineContext
CoroutineContext 是一个协程上下文,是一个接口类型,存储了一个协程的相关信息和组件,如调度器、异常处理器、协程名称等,CoroutineContext 与组件之间是继承关系,通过 CoroutineContext 的 get 操作符和组件子类的伴生对象实现 context 关联组件的索引
CoroutineContext 中有一个内部接口 Element,Element 接口中包含一个 Key 字段 (Key 是声明在 CoroutineContext 中的接口类型)
协程组件继承了 Element 接口,包含一个伴生对象 Key 实现了 CoroutineContext.Key
,Element 接口中提供了 get 操作符的默认实现
1 | public override operator fun <E : Element> get(key: Key<E>): E? = |
在 CoroutineContext 的 get 操作符中传入组件的伴生对象 Key,返回该 context 的组件对象,伴生对象 Key 可以通过类名引用
1 | CoroutineContext[Job] // 使用类名引用伴生对象,获取Job组件 |
CoroutineContext 对象支持 plus 操作符,可以将多个组件使用 plus 操作符组合,构成一个 CoroutineContext
1 | Job + Dispatcher.IO // 协程任务+调度器 |
CoroutineScope
CoroutineScope 是一个描述协程作用域的对象,是比 CoroutineContext 更基础的对象,提供了协程创建和管理的函数,一个 CoroutineScope 包含一个 CoroutineContext 字段
CoroutineScope.kt
中包含了 CoroutineScope 的定义以及一些重要的函数
1 | // CoroutineScope定义,其中包含一个CoroutineContext字段 |
Scopes.kt
中包含 ContextScope 类,实现了 CoroutineScope 接口,添加了构造函数
1 | internal class ContextScope(context: CoroutineContext) : CoroutineScope { |
Dispatcher
指定协程在指定调度器的协程中执行,当不指定调度器时,使用父协程的调度器
-
Main:主线程,依赖于平台库提供 MainDispatcher,在测试中,可以通过
kotlinx-coroutines-test
包中的扩展函数设置一个 Dispatcher 作为 MainDispatcher1
2// kotlinx-coroutines-test包中的扩展函数setMain
Dispatchers.setMain(Dispatchers.Default) -
IO:IO 线程
-
Default:由默认调度器调度,使用共享后台线程池
-
Unconfined:非受限调度器
非受限调度器:默认使用父协程的调度器,在协程执行过程中不会强制协程在某个线程或协程中工作,而是可以自由切换,在协程被挂起时,协程不保留当前线程的信息,当协程恢复时,调度器会在某个线程上重新调度协程
上下文切换
一个协程可以拥有不同的上下文,使用 CoroutineScope 的 withContext 函数切换协程的上下文,传入一个 CoroutineContext 和一个 CoroutineScope 接收者闭包
withContext 是一个挂起函数,因此协程会被挂起,调度器开启一个新的协程执行闭包,当新协程执行结束,原协程继续执行,闭包可以有返回值,返回到 withContext 函数
1 | suspend fun method() = withContext(Dispatchers.IO) { |
launch 函数不是挂起函数,不会使当前协程被挂起,withContext 是挂起函数,会使当前协程被挂起
子协程
子协程默认继承父协程的 context,当父协程被取消时,子协程将被递归取消,父协程默认等待所有的子协程结束
有两种方法可以使子协程独立于父协程
- 使用不同的 CoroutineContext 创建子协程
- 使用新的 Job 创建子协程
1 | fun main() { |
协程异常处理
异常传播:当子协程产生异常时,会向上抛出到父协程,直到根协程
- 通过 launch 和 actor 创建的根协程,异常会作为未捕获异常直接抛出
- 通过 async 和 produce 创建的根协程,当调用 await 函数时才抛出异常,依赖用户捕获
CoroutineExceptionHandler
用于处理未捕获异常,该组件继承了 CoroutineContext,可以直接作为 context 对象使用
- 向上传播异常的子协程和async 根协程不会使用该组件
- 调用 cancel 函数取消子协程时会抛出 CancellationException,父协程会自动忽略该类型的异常,不会取消父协程,当抛出了其他类型的异常,父协程会被同时取消,引起其他子协程的递归取消
- 当父协程的所有子协程全部结束后,父协程才会使用 ExceptionHandler 处理异常,处理异常并不会阻止父协程因为异常而被取消
- 若多个子协程都产生异常,则只抛出第一个异常,其他异常作为受抑制异常绑定到第一个异常
监督
父协程可以使用监督检测子协程的异常抛出,从而不影响其他的子协程,同时防止父协程因为异常而被取消
-
SupervisorJob:继承于 Job,可以使用 SupervisorJob 创建子协程或设置为父协程的 context,当子协程抛出异常时,同样会向上抛出到根协程,由根协程的 ExceptionHandler 处理,但不会引起父协程的取消
-
SupervisorScope:类似于 coroutineScope 函数,使用构造器构造一个 SupervisorCoroutine 对象,由该对象来执行闭包,SupervisorScope 中的子协程不会向上抛出异常,因此每个子协程应该设置 ExceptionHandler 来处理自身的异常
SupervisorCoroutine 继承了 ScopeCoroutine,重写了 childCancelled 方法
该方法的原始实现
1
2
3
4
5
6// 在子协程抛出异常时,父协程决定是否取消自己
public open fun childCancelled(cause: Throwable): Boolean {
// 当异常是CancellationException时取消自己
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}SupervisorCoroutine 重写了 childCancelled 方法,直接返回 false,表示当子协程抛出异常时,父协程不会取消自己
异步流
kotlin 使用 Flow 表示异步计算的流,类似 Sequence,Sequence 是同步计算,Flow 是异步计算,两者都是惰性计算,Flow 支持 Sequence 的集合操作
-
构造:flow 函数——sequence 函数,flow 函数的闭包是suspend的且可被取消
其他构造函数有 flowOf 和 asFlow
-
生成:emit 函数——yield 函数
-
收集:collect 函数——forEach 函数,collect 函数是 suspend 的
由于 flow 在收集时才进行计算,因此在构造时可以不进行挂起,在收集时才挂起
FLow 有冷流和热流两种,冷流必须包含消费者,热流不一定需要消费者,可以独立存在
- 冷流:FLow
- 热流:StateFlow,SharedFLow
- StateFlow:类似 LiveData,在状态改变时响应
- SharedFLow:类似消息总线,在值发送时响应
上下文
Flow 的协程上下文由 Flow 收集器所在的 context 决定,在构造 Flow 时所处的上下文也是收集器所在的上下文
flowOn 操作用于更改 Flow 的协程上下文,传入一个 CoroutineContext 对象,在 flowOn 操作之前的操作都将处于这个 context 对象表示的上下文中
扩展操作
Flow 扩展了一些集合操作,可以提高处理速度
-
缓冲
当生成值的速度快于处理值的速度时,可以在处理操作之前使用 buffer 操作缓存生成的值,提高整体速度
-
合并
当生成值的速度快于处理值的速度时,在处理操作之前使用 conflate 操作,可以使处理操作跳过中间值
可以理解为生成的值被压入一个栈中,当处理操作结束后会取栈顶元素处理,剩下的元素被丢弃
-
取最新值
对 Flow 的每个集合操作 xxx,都存在一个 xxxLatest 操作,当构造器生成新值时,会取消当前操作接收新值
展平流
由于 flow 是异步的,所以存在特殊的展平操作用于特殊处理
- flatMapConcat:将接收到的流串行
- flatMapMerge:将接收到的流并行
- flatMapLatest:当接收到新流时,取消当前的流操作
异常
在 Flow 的终止操作处捕获操作,可以捕获到整个操作过程中的所有异常,在捕获异常后,构造器停止生成值
使用 catch 操作符可以封装异常处理操作
1 | flow.catch { e -> |
catch 只会捕获 catch 之前的操作产生的异常,需要捕获所有异常时使用声明式捕获
1 | flow |
完成
使用 onCompletion 操作指定 Flow 完成时进行的操作,onCompletion 是一个中间操作,onCompletion 传入一个 cause 参数,表示 Flow 处理时产生的异常 (包括收集器中的异常),若 Flow 成功完成,则 cause 参数为 null
onCompletion 不处理异常,异常依然会传递到下游
1 | flow |
取消
在处于协程中的 Flow 操作中可以使用 cancel 函数取消当前 Flow 任务,cancel 函数是 CoroutineScope 对象的 cancel 函数
在取消前需要检测是否可以取消,使用 onEach 操作进行检查
1 | flow |
cancellable 操作符封装了上述检查
1 | flow.cancellable() |
通道
Channel 是一个并发安全的队列,可以实现多协程之间的通信

基本操作
- send:发送消息
- receive:接收消息
1 | val channel = Channel<Int>() // 创建channel时可以指定缓冲区大小 |
Channel 支持遍历管道中的数据,直到管道被关闭
1 | val channel = Channel<Int>() |
使用 Channel 可以将协程构造为生产者协程和消费者协程
- produce 函数:构造一个生产者协程,返回一个 ReceiveChannel
- actor 函数:构造一个消费者协程,返回一个 SendChannel
扇入、扇出
- 当 channel 有多个消费者时,每次发送一个消息,由其中一个消费者进行处理
- channel 支持多个生产者并发地发送消息