文章

Kotlin高级——协程

开始

对于Kotlin,协程就是一个线程框架,将一段代码以挂起的方式运行在后台,这段代码称为协程

suspend关键字:表示该函数是可挂起的,称为挂起函数,该函数需要在直接或间接协程内调用,因此一个挂起函数只能在协程或另一个挂起函数中调用

添加核心库和平台库

1
2
3
4
// 公共API
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'
// 平台具体实现
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2'

基本协程

创建协程

协程需要一个作用域对象创建协程,协程的作用域对象为CoroutineScope,需要一个CoroutineContext参数,通过三个顶层函数获取CoroutineScope

  • runBlocking:默认为EmptyCoroutineContext,传入CoroutineScope接收者闭包,可以使用CoroutineScope中的函数,会使当前线程阻塞
  • GlobalScope:全局单例对象,全局存在
  • CoroutineScope:传入CoroutineContext构造

launch函数

调用CoroutineScope的launch函数开启一个协程,传入CoroutineScope接收者闭包,其中进行协程操作,该闭包是suspend的,没有返回值

launch函数定义

1
2
3
4
5
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

launch函数中可以调用挂起函数也可以调用非挂起函数

将Dispatcher对象的Main字段、IO字段等设置到context参数可以指定协程在哪个线程运行

1
2
3
4
5
6
launch(Dispatcher.Main) {
    // 在主线程执行
}
launch(Dispatcher.IO) {
    // 在IO线程执行
}

launch函数返回一个Job对象,表示一个协程任务,可以通过该对象获取协程信息、控制等待等

  • join:suspend修饰,调用者协程等待该协程直到该协程完成
  • cancel:取消该协程,同时递归地取消该协程调用的所有子协程
  • cancelAndJoin:cancel和join的连续调用

Job是协程任务的抽象,表示该协程执行的任务,每个协程都有一个Job对象,当创建子协程时,子协程会继承父协程的Job对象,形成一个协程树,当父协程的Job被取消时,它的所有子协程也会被取消

协程取消

所有kotlinx.coroutines中的挂起函数都检查了取消行为,在取消时抛出CancellationException,这些函数都是可取消的,若挂起函数中没有检查取消行为,则是不可取消的,会一直执行直到完成

cancel() 函数只是向协程发送一个取消信号,实际上协程并不会立即停止执行,而是需要在协程内部进行判断并处理取消请求

有两种方式可以使挂起函数可被取消

  • yield

    yield函数会暂停当前协程,允许同一调度器的协程的运行,当当前协程被取消时,调用yield函数会抛出CancellationException,可以捕获该异常进行取消行为

  • isActive

    在协程中可以使用isActive属性,当协程被取消时,isActive为false

当挂起函数对取消请求进行处理时需要抛出CancellationException,外部进行捕获,可以在finally中进行关闭资源操作

不可取消的协程:使用withContext函数,传入NonCancellable对象作为Context,开启一个不可取消的协程

协程超时

使用withTimeout函数开启一个超时协程,指定超时时间,协程在超时后会抛出TimeoutCancellationException

使用withTimeoutOrNull开启超时协程,在超时后,withTimeoutOrNull判断超时产生的异常属于协程是否为当前协程,若是当前协程,则返回null,不抛出异常

超时协程是异步的,应该在捕获TimeoutCancellationException后进行资源关闭

协程挂起

协程的挂起和恢复是由CoroutineDispatcher调度器来实现,一个协程中的挂起函数称为挂起点,当协程遇到一个挂起点时,会保存协程当前状态,将控制权交回给调度器,协程则进入等待队列,当调度器恢复协程时,协程会从之前的状态继续执行

async并发

基本使用

用async函数也可以创建一个协程,并且有返回值,相当于有返回值的launch函数,同样传入context,start,block三个参数

async函数返回Deferred对象,表示一个在将来会返回结果的promise,调用它的await函数,在协程结束后返回值

1
2
3
4
5
6
7
launch {
    val async: Deferred<Int> = async {
        delay(2000L)
        5
    }
    val value = async.await() // 5
}

Deferred继承了Job接口,同样可以进行取消和等待

惰性Async

设置async函数的start参数为CoroutineStart.LAZY,在调用Deferred的await函数或调用Job的start函数后才启动协程

若多个Deferred不在协程中调用,则async协程将顺序执行

若多个Deferred中抛出了异常,则所有async协程和父协程都会被取消

CoroutineContext

CoroutineContext是一个协程上下文,是一个接口类型,存储了一个协程的相关信息和组件,如调度器、异常处理器、协程名称等,CoroutineContext与组件之间是继承关系,通过CoroutineContext的get操作符和组件子类的伴生对象实现context关联组件的索引

CoroutineContext中有一个内部接口Element,Element接口中包含一个Key字段(Key是声明在CoroutineContext中的接口类型)

协程组件继承了Element接口,包含一个伴生对象Key实现了CoroutineContext.Key,Element接口中提供了get操作符的默认实现

1
2
3
public override operator fun <E : Element> get(key: Key<E>): E? =
	@Suppress("UNCHECKED_CAST")
	if (this.key == key) this as E else null

在CoroutineContext的get操作符中传入组件的伴生对象Key,返回该context的组件对象,伴生对象Key可以通过类名引用

1
2
CoroutineContext[Job] // 使用类名引用伴生对象,获取Job组件
CoroutineContext[Job.Key] // 直接引用伴生对象

CoroutineContext对象支持plus操作符,可以将多个组件使用plus操作符组合,构成一个CoroutineContext

1
Job + Dispatcher.IO // 协程任务+调度器

CoroutineScope

CoroutineScope是一个描述协程作用域的对象,是比CoroutineContext更基础的对象,提供了协程创建和管理的函数,一个CoroutineScope包含一个CoroutineContext字段

CoroutineScope.kt中包含了CoroutineScope的定义以及一些重要的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// CoroutineScope定义,其中包含一个CoroutineContext字段
public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

// plus操作符,将当前context与参数context组合,通过ContextScope构造函数构造一个新的scope对象
public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
    ContextScope(coroutineContext + context)

// 构造一个scope对象,运行在主线程且启动异常监督
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

// 查看Job组件是否在活动中
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public val CoroutineScope.isActive: Boolean
    get() = coroutineContext[Job]?.isActive ?: true

// 全局scope对象
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

// 使用构造器创建一个ScopeCoroutine对象,继承父协程的context但覆盖其Job组件,并使用该对象来执行闭包
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}

// 使用指定的context构造一个scope对象,若context中不存在Job组件,则添加一个默认的Job
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

// 查看Job是否存在并取消当前context的Job,同时会取消它的所有子协程的Job
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

// cancel函数增加message参数
public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Unit = cancel(CancellationException(message, cause))

// 调用context对象的ensureActive函数,底层获取了Job组件,调用了Job的ensureActive
// Job的ensureActive判断isActive字段,若为false则抛出CancellationException
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()

// 获取当前context对象,可以避免与CoroutineScope.coroutineContext字段重名冲突
public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext

Scopes.kt中包含ContextScope类,实现了CoroutineScope接口,添加了构造函数

1
2
3
4
5
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    // CoroutineScope is used intentionally for user-friendly representation
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

Dispatcher

指定协程在指定调度器的协程中执行,当不指定调度器时,使用父协程的调度器

  • Main:主线程,依赖于平台库提供MainDispatcher,在测试中,可以通过kotlinx-coroutines-test包中的扩展函数设置一个Dispatcher作为MainDispatcher

    1
    2
    
    // kotlinx-coroutines-test包中的扩展函数setMain
    Dispatchers.setMain(Dispatchers.Default)
    
  • IO:IO线程

  • Default:由默认调度器调度,使用共享后台线程池

  • Unconfined:非受限调度器

非受限调度器:默认使用父协程的调度器,在协程执行过程中不会强制协程在某个线程或协程中工作,而是可以自由切换,在协程被挂起时,协程不保留当前线程的信息,当协程恢复时,调度器会在某个线程上重新调度协程

上下文切换

一个协程可以拥有不同的上下文,使用CoroutineScope的withContext函数切换协程的上下文,传入一个CoroutineContext和一个CoroutineScope接收者闭包

withContext是一个挂起函数,因此协程会被挂起,调度器开启一个新的协程执行闭包,当新协程执行结束,原协程继续执行,闭包可以有返回值,返回到withContext函数

1
2
3
4
suspend fun method() = withContext(Dispatchers.IO) {
    // statement
    return 2 // returnValue
}

launch函数不是挂起函数,不会使当前协程被挂起,withContext是挂起函数,会使当前协程被挂起

子协程

子协程默认继承父协程的context,当父协程被取消时,子协程将被递归取消,父协程默认等待所有的子协程结束

有两种方法可以使子协程独立于父协程

  • 使用不同的CoroutineContext创建子协程
  • 使用新的Job创建子协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fun main() {
    val coroutineScope = CoroutineScope(Dispatchers.Default)
    val job = coroutineScope.launch {
        println("这里是父协程")

        CoroutineScope(Dispatchers.IO).launch {
            println("这里是独立的子协程1")
            delay(1000)
            println("子协程1独立")
        }

        launch(Job()) {
            println("这里是独立的子协程2")
            delay(1000)
            println("子协程2独立")
        }

        launch {
            println("这里是继承父协程的子协程")
            delay(1000)
            println("子协程会不会执行") // 继承的子协程随着父协程取消,不会执行
        }
    }

    Thread.sleep(500)
    job.cancel()
    println("cancel")
    Thread.sleep(1000)
}

/* 运行结果
这里是父协程
这里是独立的子协程1
这里是独立的子协程2
这里是继承父协程的子协程
cancel
子协程2独立
子协程1独立
*/

协程异常处理

异常传播:当子协程产生异常时,会向上抛出到父协程,直到根协程

  • 通过launch和actor创建的根协程,异常会作为未捕获异常直接抛出
  • 通过async和produce创建的根协程,当调用await函数时才抛出异常,依赖用户捕获

CoroutineExceptionHandler

用于处理未捕获异常,该组件继承了CoroutineContext,可以直接作为context对象使用

  • 向上传播异常的子协程async根协程不会使用该组件
  • 调用cancel函数取消子协程时会抛出CancellationException,父协程会自动忽略该类型的异常,不会取消父协程,当抛出了其他类型的异常,父协程会被同时取消,引起其他子协程的递归取消
  • 当父协程的所有子协程全部结束后,父协程才会使用ExceptionHandler处理异常,处理异常并不会阻止父协程因为异常而被取消
  • 若多个子协程都产生异常,则只抛出第一个异常,其他异常作为受抑制异常绑定到第一个异常

监督

父协程可以使用监督检测子协程的异常抛出,从而不影响其他的子协程,同时防止父协程因为异常而被取消

  • SupervisorJob:继承于Job,可以使用SupervisorJob创建子协程或设置为父协程的context,当子协程抛出异常时,同样会向上抛出到根协程,由根协程的ExceptionHandler处理,但不会引起父协程的取消

  • SupervisorScope:类似于coroutineScope函数,使用构造器构造一个SupervisorCoroutine对象,由该对象来执行闭包,SupervisorScope中的子协程不会向上抛出异常,因此每个子协程应该设置ExceptionHandler来处理自身的异常

    SupervisorCoroutine继承了ScopeCoroutine,重写了childCancelled方法

    该方法的原始实现

    1
    2
    3
    4
    5
    6
    
    // 在子协程抛出异常时,父协程决定是否取消自己
    public open fun childCancelled(cause: Throwable): Boolean {
        // 当异常是CancellationException时取消自己
        if (cause is CancellationException) return true
        return cancelImpl(cause) && handlesException
    }
    

    SupervisorCoroutine重写了childCancelled方法,直接返回false,表示当子协程抛出异常时,父协程不会取消自己

异步流

kotlin使用Flow表示异步计算的流,类似Sequence,Sequence是同步计算,Flow是异步计算,两者都是惰性计算,Flow支持Sequence的集合操作

  • 构造:flow函数——sequence函数,flow函数的闭包是suspend的且可被取消

    其他构造函数有flowOf和asFlow

  • 生成:emit函数——yield函数

  • 收集:collect函数——forEach函数,collect函数是suspend的

    由于flow在收集时才进行计算,因此在构造时可以不进行挂起,在收集时才挂起

FLow有冷流和热流两种,冷流必须包含消费者,热流不一定需要消费者,可以独立存在

  • 冷流:FLow
  • 热流:StateFlow,SharedFLow
    • StateFlow:类似LiveData,在状态改变时响应
    • SharedFLow:类似消息总线,在值发送时响应

上下文

Flow的协程上下文由Flow收集器所在的context决定,在构造Flow时所处的上下文也是收集器所在的上下文

flowOn操作用于更改Flow的协程上下文,传入一个CoroutineContext对象,在flowOn操作之前的操作都将处于这个context对象表示的上下文中

扩展操作

Flow扩展了一些集合操作,可以提高处理速度

  • 缓冲

    当生成值的速度快于处理值的速度时,可以在处理操作之前使用buffer操作缓存生成的值,提高整体速度

  • 合并

    当生成值的速度快于处理值的速度时,在处理操作之前使用conflate操作,可以使处理操作跳过中间值

    可以理解为生成的值被压入一个栈中,当处理操作结束后会取栈顶元素处理,剩下的元素被丢弃

  • 取最新值

    对Flow的每个集合操作xxx,都存在一个xxxLatest操作,当构造器生成新值时,会取消当前操作接收新值

展平流

由于flow是异步的,所以存在特殊的展平操作用于特殊处理

  • flatMapConcat:将接收到的流串行
  • flatMapMerge:将接收到的流并行
  • flatMapLatest:当接收到新流时,取消当前的流操作

异常

在Flow的终止操作处捕获操作,可以捕获到整个操作过程中的所有异常,在捕获异常后,构造器停止生成值

使用catch操作符可以封装异常处理操作

1
2
3
4
5
flow.catch { e ->
    println(e) // 可以打印异常
    throw e // 可以重新抛出异常
    emit(value) // 可以重新生成值,但生成后流依然会停止,重新生成的值类型与流的值类型要相同
}.collect { value -> println(value) }

catch只会捕获catch之前的操作产生的异常,需要捕获所有异常时使用声明式捕获

1
2
3
4
5
6
7
8
9
10
flow
    // 使用onEach代替collect,将collect的操作代码提前
    .onEach { value ->
        check(value <= 1) { "Collected $value" }
        // collect操作
        println(value) 
    }
    // catch只捕获上游异常
    .catch { e -> println("Caught $e") }
    .collect()  // collect只作为启动流计算的终止操作

完成

使用onCompletion操作指定Flow完成时进行的操作,onCompletion是一个中间操作,onCompletion传入一个cause参数,表示Flow处理时产生的异常(包括收集器中的异常),若Flow成功完成,则cause参数为null

onCompletion不处理异常,异常依然会传递到下游

1
2
3
4
5
6
7
8
9
10
flow
    .catch { ... }  // catch1
    .onCompletion { cause ->
        println("completion")
        if (cause != null) println("Flow completed exceptionally")
    }
    .catch { ... }  // catch2
    .collect { ... }
// 传递顺序
// catch1 --> collect --> onCompletion --> catch2

取消

处于协程中的Flow操作中可以使用cancel函数取消当前Flow任务,cancel函数是CoroutineScope对象的cancel函数

在取消前需要检测是否可以取消,使用onEach操作进行检查

1
2
3
4
5
flow
    .onEach {
        // 获取当前协程上下文进行检查
        currentCoroutineContext().ensureActive()
    }

cancellable操作符封装了上述检查

1
flow.cancellable()

通道

Channel是一个并发安全的队列,可以实现多协程之间的通信

a4xq9-lj03q

基本操作

  • send:发送消息
  • receive:接收消息
1
2
3
4
5
6
7
8
val channel = Channel<Int>()  // 创建channel时可以指定缓冲区大小
launch {
    channel.send(2)
}
launch {
    val i = channel.receive()
    println(i)
}

Channel支持遍历管道中的数据,直到管道被关闭

1
2
3
4
5
6
7
8
9
10
11
12
val channel = Channel<Int>()
launch {
    for (i in 1..5) {
        channel.send(i)
    }
    channel.close() // close关闭管道
}
launch {
    for (i in channel) {
        println(i)
    }
}

使用Channel可以将协程构造为生产者协程和消费者协程

  • produce函数:构造一个生产者协程,返回一个ReceiveChannel
  • actor函数:构造一个消费者协程,返回一个SendChannel

扇入、扇出

  • 当channel有多个消费者时,每次发送一个消息,由其中一个消费者进行处理
  • channel支持多个生产者并发地发送消息
本文由作者按照 CC BY 4.0 进行授权