Kotlin高级——协程
开始
对于Kotlin,协程就是一个线程框架,将一段代码以挂起的方式运行在后台,这段代码称为协程
suspend关键字:表示该函数是可挂起的,称为挂起函数,该函数需要在直接或间接协程内调用,因此一个挂起函数只能在协程或另一个挂起函数中调用
添加核心库和平台库
1
2
3
4
// 公共API
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'
// 平台具体实现
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2'
基本协程
创建协程
协程需要一个作用域对象创建协程,协程的作用域对象为CoroutineScope,需要一个CoroutineContext参数,通过三个顶层函数获取CoroutineScope
- runBlocking:默认为EmptyCoroutineContext,传入CoroutineScope接收者闭包,可以使用CoroutineScope中的函数,会使当前线程阻塞
- GlobalScope:全局单例对象,全局存在
- CoroutineScope:传入CoroutineContext构造
launch函数
调用CoroutineScope的launch函数开启一个协程,传入CoroutineScope接收者闭包,其中进行协程操作,该闭包是suspend的,没有返回值
launch函数定义
1
2
3
4
5
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
launch函数中可以调用挂起函数也可以调用非挂起函数
将Dispatcher对象的Main字段、IO字段等设置到context参数可以指定协程在哪个线程运行
1
2
3
4
5
6
launch(Dispatcher.Main) {
// 在主线程执行
}
launch(Dispatcher.IO) {
// 在IO线程执行
}
launch函数返回一个Job对象,表示一个协程任务,可以通过该对象获取协程信息、控制等待等
- join:suspend修饰,调用者协程等待该协程直到该协程完成
- cancel:取消该协程,同时递归地取消该协程调用的所有子协程
- cancelAndJoin:cancel和join的连续调用
Job是协程任务的抽象,表示该协程执行的任务,每个协程都有一个Job对象,当创建子协程时,子协程会继承父协程的Job对象,形成一个协程树,当父协程的Job被取消时,它的所有子协程也会被取消
协程取消
所有kotlinx.coroutines
中的挂起函数都检查了取消行为,在取消时抛出CancellationException,这些函数都是可取消的,若挂起函数中没有检查取消行为,则是不可取消的,会一直执行直到完成
cancel()
函数只是向协程发送一个取消信号,实际上协程并不会立即停止执行,而是需要在协程内部进行判断并处理取消请求
有两种方式可以使挂起函数可被取消
yield
yield函数会暂停当前协程,允许同一调度器的协程的运行,当当前协程被取消时,调用yield函数会抛出CancellationException,可以捕获该异常进行取消行为
isActive
在协程中可以使用isActive属性,当协程被取消时,isActive为false
当挂起函数对取消请求进行处理时需要抛出CancellationException,外部进行捕获,可以在finally中进行关闭资源操作
不可取消的协程:使用withContext函数,传入NonCancellable对象作为Context,开启一个不可取消的协程
协程超时
使用withTimeout函数开启一个超时协程,指定超时时间,协程在超时后会抛出TimeoutCancellationException
使用withTimeoutOrNull开启超时协程,在超时后,withTimeoutOrNull判断超时产生的异常属于协程是否为当前协程,若是当前协程,则返回null,不抛出异常
超时协程是异步的,应该在捕获TimeoutCancellationException后进行资源关闭
协程挂起
协程的挂起和恢复是由CoroutineDispatcher调度器来实现,一个协程中的挂起函数称为挂起点,当协程遇到一个挂起点时,会保存协程当前状态,将控制权交回给调度器,协程则进入等待队列,当调度器恢复协程时,协程会从之前的状态继续执行
async并发
基本使用
用async函数也可以创建一个协程,并且有返回值,相当于有返回值的launch函数,同样传入context,start,block三个参数
async函数返回Deferred对象,表示一个在将来会返回结果的promise,调用它的await函数,在协程结束后返回值
1
2
3
4
5
6
7
launch {
val async: Deferred<Int> = async {
delay(2000L)
5
}
val value = async.await() // 5
}
Deferred继承了Job接口,同样可以进行取消和等待
惰性Async
设置async函数的start参数为CoroutineStart.LAZY,在调用Deferred的await函数或调用Job的start函数后才启动协程
若多个Deferred不在协程中调用,则async协程将顺序执行
若多个Deferred中抛出了异常,则所有async协程和父协程都会被取消
CoroutineContext
CoroutineContext是一个协程上下文,是一个接口类型,存储了一个协程的相关信息和组件,如调度器、异常处理器、协程名称等,CoroutineContext与组件之间是继承关系,通过CoroutineContext的get操作符和组件子类的伴生对象实现context关联组件的索引
CoroutineContext中有一个内部接口Element,Element接口中包含一个Key字段(Key是声明在CoroutineContext中的接口类型)
协程组件继承了Element接口,包含一个伴生对象Key实现了CoroutineContext.Key
,Element接口中提供了get操作符的默认实现
1
2
3
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
在CoroutineContext的get操作符中传入组件的伴生对象Key,返回该context的组件对象,伴生对象Key可以通过类名引用
1
2
CoroutineContext[Job] // 使用类名引用伴生对象,获取Job组件
CoroutineContext[Job.Key] // 直接引用伴生对象
CoroutineContext对象支持plus操作符,可以将多个组件使用plus操作符组合,构成一个CoroutineContext
1
Job + Dispatcher.IO // 协程任务+调度器
CoroutineScope
CoroutineScope是一个描述协程作用域的对象,是比CoroutineContext更基础的对象,提供了协程创建和管理的函数,一个CoroutineScope包含一个CoroutineContext字段
CoroutineScope.kt
中包含了CoroutineScope的定义以及一些重要的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// CoroutineScope定义,其中包含一个CoroutineContext字段
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
// plus操作符,将当前context与参数context组合,通过ContextScope构造函数构造一个新的scope对象
public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
ContextScope(coroutineContext + context)
// 构造一个scope对象,运行在主线程且启动异常监督
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// 查看Job组件是否在活动中
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public val CoroutineScope.isActive: Boolean
get() = coroutineContext[Job]?.isActive ?: true
// 全局scope对象
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
// 使用构造器创建一个ScopeCoroutine对象,继承父协程的context但覆盖其Job组件,并使用该对象来执行闭包
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// 使用指定的context构造一个scope对象,若context中不存在Job组件,则添加一个默认的Job
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
// 查看Job是否存在并取消当前context的Job,同时会取消它的所有子协程的Job
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
// cancel函数增加message参数
public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Unit = cancel(CancellationException(message, cause))
// 调用context对象的ensureActive函数,底层获取了Job组件,调用了Job的ensureActive
// Job的ensureActive判断isActive字段,若为false则抛出CancellationException
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()
// 获取当前context对象,可以避免与CoroutineScope.coroutineContext字段重名冲突
public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext
Scopes.kt
中包含ContextScope类,实现了CoroutineScope接口,添加了构造函数
1
2
3
4
5
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
// CoroutineScope is used intentionally for user-friendly representation
override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
Dispatcher
指定协程在指定调度器的协程中执行,当不指定调度器时,使用父协程的调度器
Main:主线程,依赖于平台库提供MainDispatcher,在测试中,可以通过
kotlinx-coroutines-test
包中的扩展函数设置一个Dispatcher作为MainDispatcher1 2
// kotlinx-coroutines-test包中的扩展函数setMain Dispatchers.setMain(Dispatchers.Default)
IO:IO线程
Default:由默认调度器调度,使用共享后台线程池
Unconfined:非受限调度器
非受限调度器:默认使用父协程的调度器,在协程执行过程中不会强制协程在某个线程或协程中工作,而是可以自由切换,在协程被挂起时,协程不保留当前线程的信息,当协程恢复时,调度器会在某个线程上重新调度协程
上下文切换
一个协程可以拥有不同的上下文,使用CoroutineScope的withContext函数切换协程的上下文,传入一个CoroutineContext和一个CoroutineScope接收者闭包
withContext是一个挂起函数,因此协程会被挂起,调度器开启一个新的协程执行闭包,当新协程执行结束,原协程继续执行,闭包可以有返回值,返回到withContext函数
1
2
3
4
suspend fun method() = withContext(Dispatchers.IO) {
// statement
return 2 // returnValue
}
launch函数不是挂起函数,不会使当前协程被挂起,withContext是挂起函数,会使当前协程被挂起
子协程
子协程默认继承父协程的context,当父协程被取消时,子协程将被递归取消,父协程默认等待所有的子协程结束
有两种方法可以使子协程独立于父协程
- 使用不同的CoroutineContext创建子协程
- 使用新的Job创建子协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fun main() {
val coroutineScope = CoroutineScope(Dispatchers.Default)
val job = coroutineScope.launch {
println("这里是父协程")
CoroutineScope(Dispatchers.IO).launch {
println("这里是独立的子协程1")
delay(1000)
println("子协程1独立")
}
launch(Job()) {
println("这里是独立的子协程2")
delay(1000)
println("子协程2独立")
}
launch {
println("这里是继承父协程的子协程")
delay(1000)
println("子协程会不会执行") // 继承的子协程随着父协程取消,不会执行
}
}
Thread.sleep(500)
job.cancel()
println("cancel")
Thread.sleep(1000)
}
/* 运行结果
这里是父协程
这里是独立的子协程1
这里是独立的子协程2
这里是继承父协程的子协程
cancel
子协程2独立
子协程1独立
*/
协程异常处理
异常传播:当子协程产生异常时,会向上抛出到父协程,直到根协程
- 通过launch和actor创建的根协程,异常会作为未捕获异常直接抛出
- 通过async和produce创建的根协程,当调用await函数时才抛出异常,依赖用户捕获
CoroutineExceptionHandler
用于处理未捕获异常,该组件继承了CoroutineContext,可以直接作为context对象使用
- 向上传播异常的子协程和async根协程不会使用该组件
- 调用cancel函数取消子协程时会抛出CancellationException,父协程会自动忽略该类型的异常,不会取消父协程,当抛出了其他类型的异常,父协程会被同时取消,引起其他子协程的递归取消
- 当父协程的所有子协程全部结束后,父协程才会使用ExceptionHandler处理异常,处理异常并不会阻止父协程因为异常而被取消
- 若多个子协程都产生异常,则只抛出第一个异常,其他异常作为受抑制异常绑定到第一个异常
监督
父协程可以使用监督检测子协程的异常抛出,从而不影响其他的子协程,同时防止父协程因为异常而被取消
SupervisorJob:继承于Job,可以使用SupervisorJob创建子协程或设置为父协程的context,当子协程抛出异常时,同样会向上抛出到根协程,由根协程的ExceptionHandler处理,但不会引起父协程的取消
SupervisorScope:类似于coroutineScope函数,使用构造器构造一个SupervisorCoroutine对象,由该对象来执行闭包,SupervisorScope中的子协程不会向上抛出异常,因此每个子协程应该设置ExceptionHandler来处理自身的异常
SupervisorCoroutine继承了ScopeCoroutine,重写了childCancelled方法
该方法的原始实现
1 2 3 4 5 6
// 在子协程抛出异常时,父协程决定是否取消自己 public open fun childCancelled(cause: Throwable): Boolean { // 当异常是CancellationException时取消自己 if (cause is CancellationException) return true return cancelImpl(cause) && handlesException }
SupervisorCoroutine重写了childCancelled方法,直接返回false,表示当子协程抛出异常时,父协程不会取消自己
异步流
kotlin使用Flow表示异步计算的流,类似Sequence,Sequence是同步计算,Flow是异步计算,两者都是惰性计算,Flow支持Sequence的集合操作
构造:flow函数——sequence函数,flow函数的闭包是suspend的且可被取消
其他构造函数有flowOf和asFlow
生成:emit函数——yield函数
收集:collect函数——forEach函数,collect函数是suspend的
由于flow在收集时才进行计算,因此在构造时可以不进行挂起,在收集时才挂起
FLow有冷流和热流两种,冷流必须包含消费者,热流不一定需要消费者,可以独立存在
- 冷流:FLow
- 热流:StateFlow,SharedFLow
- StateFlow:类似LiveData,在状态改变时响应
- SharedFLow:类似消息总线,在值发送时响应
上下文
Flow的协程上下文由Flow收集器所在的context决定,在构造Flow时所处的上下文也是收集器所在的上下文
flowOn操作用于更改Flow的协程上下文,传入一个CoroutineContext对象,在flowOn操作之前的操作都将处于这个context对象表示的上下文中
扩展操作
Flow扩展了一些集合操作,可以提高处理速度
缓冲
当生成值的速度快于处理值的速度时,可以在处理操作之前使用buffer操作缓存生成的值,提高整体速度
合并
当生成值的速度快于处理值的速度时,在处理操作之前使用conflate操作,可以使处理操作跳过中间值
可以理解为生成的值被压入一个栈中,当处理操作结束后会取栈顶元素处理,剩下的元素被丢弃
取最新值
对Flow的每个集合操作xxx,都存在一个xxxLatest操作,当构造器生成新值时,会取消当前操作接收新值
展平流
由于flow是异步的,所以存在特殊的展平操作用于特殊处理
- flatMapConcat:将接收到的流串行
- flatMapMerge:将接收到的流并行
- flatMapLatest:当接收到新流时,取消当前的流操作
异常
在Flow的终止操作处捕获操作,可以捕获到整个操作过程中的所有异常,在捕获异常后,构造器停止生成值
使用catch操作符可以封装异常处理操作
1
2
3
4
5
flow.catch { e ->
println(e) // 可以打印异常
throw e // 可以重新抛出异常
emit(value) // 可以重新生成值,但生成后流依然会停止,重新生成的值类型与流的值类型要相同
}.collect { value -> println(value) }
catch只会捕获catch之前的操作产生的异常,需要捕获所有异常时使用声明式捕获
1
2
3
4
5
6
7
8
9
10
flow
// 使用onEach代替collect,将collect的操作代码提前
.onEach { value ->
check(value <= 1) { "Collected $value" }
// collect操作
println(value)
}
// catch只捕获上游异常
.catch { e -> println("Caught $e") }
.collect() // collect只作为启动流计算的终止操作
完成
使用onCompletion操作指定Flow完成时进行的操作,onCompletion是一个中间操作,onCompletion传入一个cause参数,表示Flow处理时产生的异常(包括收集器中的异常),若Flow成功完成,则cause参数为null
onCompletion不处理异常,异常依然会传递到下游
1
2
3
4
5
6
7
8
9
10
flow
.catch { ... } // catch1
.onCompletion { cause ->
println("completion")
if (cause != null) println("Flow completed exceptionally")
}
.catch { ... } // catch2
.collect { ... }
// 传递顺序
// catch1 --> collect --> onCompletion --> catch2
取消
在处于协程中的Flow操作中可以使用cancel函数取消当前Flow任务,cancel函数是CoroutineScope对象的cancel函数
在取消前需要检测是否可以取消,使用onEach操作进行检查
1
2
3
4
5
flow
.onEach {
// 获取当前协程上下文进行检查
currentCoroutineContext().ensureActive()
}
cancellable操作符封装了上述检查
1
flow.cancellable()
通道
Channel是一个并发安全的队列,可以实现多协程之间的通信
基本操作
- send:发送消息
- receive:接收消息
1
2
3
4
5
6
7
8
val channel = Channel<Int>() // 创建channel时可以指定缓冲区大小
launch {
channel.send(2)
}
launch {
val i = channel.receive()
println(i)
}
Channel支持遍历管道中的数据,直到管道被关闭
1
2
3
4
5
6
7
8
9
10
11
12
val channel = Channel<Int>()
launch {
for (i in 1..5) {
channel.send(i)
}
channel.close() // close关闭管道
}
launch {
for (i in channel) {
println(i)
}
}
使用Channel可以将协程构造为生产者协程和消费者协程
- produce函数:构造一个生产者协程,返回一个ReceiveChannel
- actor函数:构造一个消费者协程,返回一个SendChannel
扇入、扇出
- 当channel有多个消费者时,每次发送一个消息,由其中一个消费者进行处理
- channel支持多个生产者并发地发送消息